Communication Patterns in Cloud Haskell (Part 3)
Friday, 12 October 2012, by Edsko de Vries.
Filed under parallel, cloud-haskell.
Map-Reduce
In Part 1 and Part 2 of this series we described a number of ways in which we might compute the number of prime factors of a set of numbers in a distributed manner. Abstractly, these examples can be described as
a problem (computing the number of prime factors of 100 natural numbers) is split into subproblems (factorizing each number), those subproblems are solved by slave nodes, and the partial results are then combined (summing the number of factors)
This is reminiscent of Google's Map-Reduce algorithm (MapReduce: Simplified Data Processing on Large Clusters, Dean and Ghemawat, OSDI'04), with the "Map" part corresponding to the computation of partial results and the "Reduce" part corresponding to combining these results. We will explain the basic ideas behind Map-Reduce before giving a distributed implementation using work-stealing.
Local map-reduce
The exposition we give in this section is based on Google's MapReduce Programming Model -- Revisited by Ralf Laemmel (SCP 2008). The Map-Reduce algorithm transforms key-value maps; in Haskell, its type is given by
-- The type of Map-Reduce skeletons (provided by the user) data MapReduce k1 v1 k2 v2 v3 = MapReduce { mrMap :: k1 -> v1 -> [(k2, v2)] , mrReduce :: k2 -> [v2] -> v3 } -- The driver (which "executes" a Map-Reduce skeleton) localMapReduce :: Ord k2 => MapReduce k1 v1 k2 v2 v3 -> Map k1 v1 -> Map k2 v3
We start with a key-value map (with keys of type
k1
and values of typev1
). With the help of the "Map" (mrMap
) part of a Map-Reduce skeleton each of these key-value pairs is turned into a list of key-value pairs (with keys of typek2
and values of typev2
); note that this this list may (and typically does) contain multiple pairs with the same key. The Map-Reduce driver then collects all values for each key, and finally reduces this list of values (of typev2
) to a single value (of typev3
) using the "Reduce" (mrReduce
) part of the skeleton.(Colour changes in the diagram indicate type changes.)
Exercise 6: Implement
localMapReduce
. The types pretty much say what needs to be done.
Consider counting the number of words in a set of documents; that is, we want
to transform a Map FilePath Document
to a Map Word Frequency
. We can do
this with the following Map-Reduce skeleton:
countWords :: MapReduce FilePath Document Word Frequency Frequency countWords = MapReduce { mrMap = const (map (, 1) . words) , mrReduce = const sum }
Distributed map-reduce
We are going to have slave nodes execute the Map part of the algorithm; we will
use work-stealing to distribute the work amongst the slaves. We are not going
to distribute the Reduce part of the algorithm, but do that on a single
machine. For now we will give a monomorphic implementation of the distributed
Map-Reduce algorithm, tailored specifically for the countWords
example from the
previous section. In the next post we will see how to make the
implementation polymorphic.
The implementation follows the Work-Pushing example from Part 1 very closely. The slave asks for work and executes it, using the mrMap
part of the Map-Reduce skeleton:
mapperProcess :: (ProcessId, ProcessId, Closure (MapReduce String String String Int Int)) -> Process () mapperProcess (master, workQueue, mrClosure) = do us <- getSelfPid mr <- unClosure mrClosure go us mr where go us mr = do -- Ask the queue for work send workQueue us -- Wait for a reply; if there is work, do it and repeat; otherwise, exit receiveWait [ match $ \(key, val) -> send master (mrMap mr key val) >> go us mr , match $ \() -> return () ] remotable ['mapperProcess]
Note that the slave wants a Closure of a Map-Reduce skeleton; since the Map-Reduce skeleton itself contains functions, it is not serializable.
The master process is
distrMapReduce :: Closure (MapReduce String String String Int Int) -> [NodeId] -> Map String String -> Process (Map String Int) distrMapReduce mrClosure mappers input = do mr <- unClosure mrClosure master <- getSelfPid workQueue <- spawnLocal $ do -- Return the next bit of work to be done forM_ (Map.toList input) $ \(key, val) -> do them <- expect send them (key, val) -- Once all the work is done tell the mappers to terminate replicateM_ (length mappers) $ do them <- expect send them () -- Start the mappers forM_ mappers $ \nid -> spawn nid ($(mkClosure 'mapperProcess) (master, workQueue, mrClosure)) -- Wait for the partial results partials <- replicateM (Map.size input) expect -- We reduce on this node return (reducePerKey mr . groupByKey . concat $ partials)
We can now implement "distributed word counting" as
countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency countWords_ () = countWords remotable ['countWords_] distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency) distrCountWords = distrMapReduce ($(mkClosure 'countWords_) ())
Performance
If we use ThreadScope to look at how busy the nodes are, we find something like
with alternating activity between the master node (which does the reduction, top) and the mapper nodes (bottom). This is unsurprising, of course; we have a distributed implementation of the Map phase but reduce locally.
Exercise 7: Change the implementation so that the reduce step happens in parallel too and confirm that the work distribution is now better.
Note: as hinted at by the ThreadScope plot above, it is possible to use ThreadScope to look at distributed applications running in multiple OS processes, including on different physical hosts. Doing so is currently a somewhat manual process. In addition to the normal step of linking the application using the
-eventlog
flag, and running with the flag+RTS -l -RTS
, there are a few steps after the program is complete: collect the.eventlog
files from each node; use theghc-events merge
command to merge all the eventlog files into one (which currently has to be done in several steps because the merge command only takes pairs of files at once) and finally use ThreadScope to view the combined eventlog.
To be continued
In the next blog post we will look at how we can make the distributed Map-Reduce skeleton polymorphic. We'll also look at k-means as an example algorithm that can be implemented using Map-Reduce. In fact for efficiency, k-means requires a "multi-shot" Map-Reduce and we'll see how Haskell and Cloud Haskell give us the flexibility we need to make this kind of extension.