Communication Patterns in Cloud Haskell (Part 2)
Monday, 08 October 2012, by Edsko de Vries.
Filed under parallel, cloud-haskell.
Performance
In Part 1 of this series we introduced a number of basic Cloud Haskell communication patterns. All these patterns will work fine for modest numbers of work items; in this blog post we're going to look at the performance and memory use for much larger numbers of work items, so we can see how each pattern scales up. We will see that scalability improves as we move from master-slave to work-pushing to work-stealing, and that many further improvements can still be made. More importantly, we hope to help you to predict runtime behaviour from your code, so that you can write more scalable and high-performance code.
Predicting resource use and performance
When thinking about Cloud Haskell performance you should consider things such as
- Bandwidth. How many messages are you sending, and how large are those messages? Network bandwidth is typically several orders of magnitudes lower than direct memory access, so it is important to be aware of the cost of messaging. In fact, an important design criterion in the development of Cloud Haskell was that this cost should be visible to the programmer.
- Latency. It takes time for even the smallest message to arrive at a remote
node. Synchronous operations wait for an acknowledgement from the remote
endpoint and hence imply a full network roundtrip. Hence synchronous
operations are often much slower than asynchronous operations, which fire
off a message to the remote endpoint but don't wait to see when it arrives.
For example, on many networks the bandwidth and latency are such that
sending a hundred small messages in quick succession and waiting for one reply
will take about the same time as sending just one message and waiting for a
reply.
The Cloud Haskell documentation will tell you which primitives are
synchronous and which are not, but often you should be able to guess based on the result type. For
example,
send
is asynchronous (result type()
), butspawn
is synchronous: it sends aClosure
to the remote endpoint, and then waits for the remote endpoint to reply with the process ID of the newly created process. (There is also an asynchronous version ofspawn
, as we will see below).
- Message Queue Size. When messages are sent to a process they will sit in that process's message queue (or "mailbox") until they are processed. These messages are kept in memory. How much memory this uses depends on the number of messages and their size.
- Number of Processes. Cloud Haskell processes are implemented as Haskell threads plus some additional data. They are therefore relatively lightweight, but not free.
You should try and make a mental comparison between the Master-Slave, Work-Pushing and Work-Stealing examples before reading the analysis, below.
The Master process in the Master-Slave setup
In the discussion of the master-slave setup we mentioned that spawning a separate process for each subcomputation up-front might be too resource intensive on the client side. What we didn't mention, however, is that the master process we defined there is also resource hungry.
Heap profile of the master process in the Master-Slave example Single slave, factorizing first n = 5000 numbers
1. As defined above
2. Using
reconnect
3. Using
spawnAsync
It is obvious from the heap profile that the master process has a memory leak. But where is that coming from? The problem arises when we spawn the child processes:
spawnLocal $ forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> spawn there ($(mkClosure 'slave) (us, m))
In order to deal with network failure at any point during the spawning process,
spawning involves sending a message to the newly started remote process that
says "it's okay to start running now". In order to provide reliable and ordered
message passing, Cloud Haskell must maintain some state for every destination
that a process sends messages to. This means that for the master
process it
must maintain state for n outgoing connections, since it starts a process for
each subcomputation.
In a future version of Cloud Haskell this problem might be solved
automatically, but for now we can solve it manually by using the reconnect
primitive. The reconnect
primitive is primarily meant to be used in
situations where processes get disconnected from each other, but as a side
effect it lets us release the connection state, and that's the effect we need
for our workaround here:
spawnLocal $ forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do them <- spawn there ($(mkClosure 'slave) (us, m)) reconnect them
This yields heap profile (2), above, and we see that the memory leak has disappeared.
However, there is another problem with this master. If we compare performance,
we find that it is more than an order of magnitude slower than the master
process in the work-pushing version. The reason is that spawn
is synchronous:
we wait for the remote node to tell us the process ID of the newly started
process. That means that the execution of the master looks something like
spawn first process wait for reply spawn second process wait for reply ...
Synchronous message passing is much slower than asynchronous message passing,
which is why the work-pushing master is so much faster. However, spawn
is
actually implemented in terms of spawnAsync
, which we can use directly:
spawnLocal $ forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do spawnAsync there ($(mkClosure 'slave) (us, m)) _ <- expectTimeout 0 :: Process (Maybe DidSpawn) return ()
This yields a similar heap profile (profile 3, above), but is much faster. We
use the expectTimeout
function to check – without actually waiting – and
discard DidSpawn
notifications (which are notifications from the remote node
that tell us the PID of new processes); this avoids a build-up in our message
queue, which would show up in the heap profile as a new memory leak. For every
spawnAsync
we do we remove at most one DidSpawn
message; any DidSpawn
message that are "left over" when we finish spawning will simply be discarded
because the process that did the spawning will have terminated.
Comparing the Master processes across Master-Slave, Work-Pushing and Work-Stealing
The heap profile of the Master-Slave (with the memory leak fixed) is pretty similar to the heap profile of the master process of the work-pushing and the work-stealing examples:
Heap profiles of the master processes Single slave, factorizing first n = 50,000 numbers
1. In the master-slave setup
2. In the work-pushing setup
3. In the work-stealing setup
Comparing the Slave processes across Master-Slave, Work-Pushing and Work-Stealing
The heap profiles of the slave processes are however more interesting:
Heap profiles of the slave processes Single slave, factorizing first n = 50,000 numbers
1. In the master-slave setup
2. In the work-pushing setup
3. In the work-stealing setup
As expected, the slave in the Master-Slave setup is the most resource hungry. A large amount of memory is used up when the slave starts; these are mostly messages to the slave's node controller that tell the node which processes to spawn. The heap-profile of the work-pushing slave has a similar shape, but with a much lower peak: in this case the memory is used to store the messages (containing just an integer) from the master to the slave process.
The heap profile of the work-stealing slave looks different. The increase in size you see comes from the algorithm used: it takes more memory to compute the number of prime factors of larger numbers. Note however that this only comes to 550 kB in total; the master-slave and work-pushing slaves had the same behaviour, but the memory used by the factorization algorithm was insignificant.
Execution time
Finally, let's plot execution time versus number of slaves:
Execution time (in seconds) versus number of slaves
Ignoring the absolute values, we can conclude that the Master-Slave example has a lot of overhead and is significantly slower than the other two setups. The work-pushing setup is slightly faster than the work-stealing setup, probably due to the higher latency we mentioned above. Finally, speedup is much less than linear in the number of slaves; this is due to the relatively high communication overhead compared to the cost of the actual computation.
Exercise 5: Extend the time/memory comparison to the work-stealing-with-low-latency setup from Exercise 4 (from Part 1). You should find that it's is as fast as the work-pushing setup and that its heap profile is comparable to the work-stealing setup.
To be continued
In the next blog post we will see how these communication patterns can be generalized as a Map-Reduce skeleton.