Providence Salumu
Up until now, we have been considering programs that run on a single
machine, while possibly making use of multiple processors to exploit
parallelism. But there is a far more plentiful source of parallelism: running a program on multiple machines
simultaneously. We call this distributed programming, and
Haskell supports it through a framework called distributed-process.[50]
Aside from the obvious advantages of multimachine parallelism, there are other reasons to write distributed programs. For example:
So what should distributed programming look like from the programmer’s
perspective? Should it look like Concurrent Haskell, with forkIO,
MVar, and STM? In fact, there are some good reasons to treat
distributed computation very differently from computation on a
shared-memory multicore:
For these reasons, the Haskell developers decided that the model for
distributed programming should be based on explicit message
passing, and not the MVar and STM models that we provide for
shared-memory concurrency.[51] Think of it as having TChan be the basic
primitive available for communication. It is possible to build
higher-level abstractions on top of the explicit message-passing
layer, just as we built higher-level abstractions on top of STM and
MVar in earlier chapters.
There is no built-in support for distributed programming in Haskell. It is all implemented as libraries using the concurrency facilities we have covered in earlier chapters.
The package providing the core APIs for distributed programming is
called distributed-process. It must be used
together with a separate transport layer package that provides
infrastructure for sending and receiving messages between nodes in the
distributed network. The distributed-process package is
deliberately independent of the transport layer so we can plug
in different transport layer implementations. The most common
transport layer is likely to be TCP/IP, as provided by the network-transport-tcp package, but we could imagine a transport
layer that used shared memory to communicate among multiple nodes on
the same multicore machine, or transport layers supporting some of the
faster networks designed for clusters, such as InfiniBand.
Each transport layer needs a different mechanism for creating and
shutting down nodes on the network and discovering which nodes are
available (peer discovery). We will be using the package
distributed-process-simplelocalnet that provides a simple
implementation on top of the network-transport-tcp transport layer.
At the time of writing, the distributed-process framework is
somewhat new and a little rough around the edges, but it is already
quite fully featured and we expect it to mature in due
course.[52]
It is reasonable to wonder whether we even need a framework to do
distributed message-passing. After all, can’t we just use the
network package directly and program our own message passing?
Certainly you could do this, but the packages described in this chapter provide a lot
of functionality that makes it much easier to build a distributed
application. They let you think about your application as a single program that happens to run on multiple machines, rather than a collection of
programs running on different machines that talk to one another.
For example, with the distributed-process framework, we can call a
function spawn that spawns a process (like a thread) on a different
machine, and we can exchange messages with the remote process directly in the
form of Haskell data types. Even though we are writing a single
program to execute on multiple machines, there is no need for all the
machines to be identical; indeed, programmers often want to
exploit some non-uniformity. For example, we might want to run a
caching service on a machine with lots of memory while sending
compute-intensive tasks to machines with lots of fast cores. There
may also be nonuniformity in the network topology. We might want to
perform a database query on a machine close to the database server,
for example, or put services that communicate with each other
frequently close to one another in the network.
The distributed-process framework provides a whole infrastructure suite that
supports the distributed application domain. These are some of the important
facilities it provides:
We have included distribution in the concurrency part of this book for the simple reason that the explicit message-passing API we’ll describe is concurrent and nondeterministic. And yet, the main reason to want to use distribution is to exploit the parallelism of running on multiple machines simultaneously. So this setting is similar to parallel programming using threads described in Chapter 13, except that here we have only message passing and no shared state for coordination.
It is a little unfortunate that we have to resort to a
nondeterministic programming model to achieve parallelism just
because we want to exploit multiple machines. There are efforts
under way to build deterministic programming models atop the
distributed-process framework, although at the time of writing these projects are too
experimental to include in this book.[53]
To get acquainted with the basics of distributed programming, we will start with a simple example: a ping/pong message exchange. To start with, there will be a single master process that creates a child process. The master process will send a “ping” message to the child, which will respond with a “pong” message and the program will then exit.
The ping example will illustrate the basic pattern for setting up a
program to use the distributed-process framework and introduce the APIs for
creating processes and simple message passing. The first version of
the program will run on a single node (machine) so we can
get familiar with the basics of the interface before moving on to
working with multiple nodes.
For reference, the subset of the Control.Distributed.Process API that we will be using is shown here:
dataProcess-- instance Monad, MonadIOdataNodeId-- instance Eq, Ord, Show, Typeable, BinarydataProcessId-- instance Eq, Ord, Show, Typeable, BinarygetSelfPid::ProcessProcessIdgetSelfNode::ProcessNodeIdspawn::NodeId->Closure(Process())->ProcessProcessIdsend::Serializablea=>ProcessId->a->Process()expect::Serializablea=>Processaterminate::Processasay::String->Process()
First, a bit of terminology. A distributed program consists of a set of
processes that may communicate with one another by sending and
receiving messages. A process is like a thread. Processes run
concurrently with one another, and every process has
a unique ProcessId. There are a
couple of important differences between threads and processes,
however:
Process monad, rather than the IO
monad. Process is an instance of MonadIO, so you can perform
IO operations in Process by wrapping them in liftIO. All
message-passing operations are in Process, so only processes,
not threads, can engage in message passing.
We start by defining the type of messages that our processes will send and receive:
distrib-ping/ping.hs
dataMessage=PingProcessId|PongProcessIdderiving(Typeable,Generic)--![]()
instanceBinaryMessage--![]()
The Ping message contains the ProcessId of the process that sent
it so that the target of the message knows where to send the response.
The Pong response also includes the ProcessId of the responder so
that the master process can tell which process a particular response
comes from.
Messages in a distributed program can be sent over the network, which
Involves serializing the Haskell data into a stream of bytes before
it is sent and deserializing the bytes back into Haskell data at
the other end. The distributed-process framework uses the Binary
class from the binary package to implement serialization and
deserialization, and hence every message type must be an instance of
Binary.
The serialization format is under your control. If you want, you can
define your own Binary instance that uses a specialized
serialization format. Normally, however, you’ll just want an
automatically derived Binary instance. Fortunately, the binary
package[54] lets you derive
Binary instances using GHC’s DeriveGeneric extension.[55] To do this, we first derive the Generic
class (
) and then declare an instance
of Binary for Message (
); GHC
fills in the method definitions of this instance for us.
Message types must also be an instance of Typeable, because they can
be sent to dynamically typed channels (more about this later). For
Typeable, we can derive the instance directly (
).
Typeable and Binary are normally packaged up together and referred
to as Serializable using the following class provided by Control.Distributed.Process.Serializable:
class(Binarya,Typeablea)=>Serializableainstance(Binarya,Typeablea)=>Serializablea
There’s nothing magic about Serializable. Just think of
Serializable a as shorthand for (Binary a, Typeable a). You’ll
see Serializable used a lot in the Control.Distributed.Process APIs.
Next, we’ll write the code for a “ping server” process. The ping
server must wait for a Ping message and then respond with a
Pong message.
pingServer::Process()pingServer=doPingfrom<-expect--![]()
say$printf"ping received from %s"(showfrom)--![]()
mypid<-getSelfPid--![]()
sendfrom(Pongmypid)--![]()
First of all, notice that we are in the Process monad. As we
mentioned earlier, virtually all of the Control.Distributed.Process API is in this monad,
and only code running in the Process monad can communicate with
other processes and spawn new processes. There has to be a way to get
into Process in the first place; we’ll see how that happens
shortly, but for now let’s assume we’re already in Process and we
need to program the ping server.
At
we receive the next message
using expect:
expect::Serializablea=>Processa
The expect function receives a message sent directly to this
process. Each process has a channel associated with it, and the
channel can receive messages of any type. The expect call receives
a message of a particular type, where the type is determined by the
context. If the type cannot be determined, the compiler will
complain that the type is ambiguous, and the usual fix is to add a
type signature. In the example just shown, the type of messages to receive
is determined by the pattern match on the result, which matches
directly on the Ping constructor and thus forces expect to receive
messages of the type Message.
The expect function is a little like Haskell’s read function, in
that it returns a value whose type depends on the context. But
whereas read fails if its argument cannot be parsed as the desired
type, expect skips over messages in the queue that do not match and
returns the first one that matches. Messages that don’t match the
expected type are left in the channel for the time being.
If there are no messages of the right type, expect will block until
one arrives. Therefore, it should be used with care: the other
messages in the queue are ignored while expect is waiting for the
right kind of message to arrive, which could lead to a deadlock.
We’ll see later how to wait for several different types of message at
the same time.
The say function, called at
,
causes a message to be logged, which is a useful way to debug your
program. Usually, the message will be logged to stderr, but it might
be sent somewhere else if the transport layer overrides the default
logging process.
At
we call getSelfPid to
obtain the ProcessId of the current process. The ProcessId of the
current process is needed because the Pong message will contain it:
getSelfPid::ProcessProcessId
And at
we send a response back
to the originator of the Ping. The function send is used to send
a message to a process, and it has the following type:
send::(Serializablea)=>ProcessId->a->Process()
We know which ProcessId to send the Pong to because it was
contained in the original Ping message.
Now we need to be able to create processes running pingServer.
Although in this example we will be creating the process on the local
node, in general we might be creating the process on another
node. Functions that will be executed remotely in this way need to be
declared explicitly.[56] The
following declaration invokes a bit of Template Haskell magic that creates the necessary infrastructure to allow pingServer to be executed remotely:[57]
remotable['pingServer]
Next, we will write the code for the master process. As you might expect, this is an operation of type Process ():
master::Process()master=donode<-getSelfNode--![]()
say$printf"spawning on %s"(shownode)pid<-spawnnode$(mkStaticClosure'pingServer) --![]()
mypid<-getSelfPid--![]()
say$printf"sending ping to %s"(showpid)sendpid(Pingmypid)--![]()
Pong_<-expect--![]()
say"pong."terminate--
All that remains to complete the program is to define our main
function, and here it is:
main::IO()main=distribMain(\_->master)Main.__remoteTable
The main function calls distribMain from DistribUtils, which is a small
module of utilities provided with the sample code to make these
examples a bit less cluttered. The distribMain function is a wrapper around the lower-level startup facilities from the distributed-process-simplelocalnet package. It starts up the distributed-process framework with the
distributed-process-simplelocalnet backend on a single node.
The first argument to distribMain is the Process computation to
run as the master process on the node. It has type [NodeId] ->
Process (), where the list of NodeIds are the other nodes in our
distributed network. Because this example is
running on a single node, we ignore the [NodeId] and just invoke the
master function as our master process.
The second argument to distribMain is the metadata used to execute
remote calls; in this case we pass Main.__remoteTable, which is
generated by the Template Haskell call to remotable we showed
earlier.
When you run the program, you should see output like this:[58]
$ ./ping pid://localhost:44444:0:3: spawning on nid://localhost:44444:0 pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4 pid://localhost:44444:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: pong.
Each of these messages corresponds to one of the calls to say in the
example program, and they are tagged with the date, time, and
ProcessId of the process that called say.
In this section, we built the simplest distributed program possible: it spawns a single child process and performs a simple ping/pong message exchange. Here are the key things to take away:
spawn, passing a NodeId and a
Closure (Process ()). The former we got from getSelfNode
(there are other ways, which we will encounter shortly), and the
latter was generated by a call to the Template Haskell function
mkStaticClosure.
Process monad, which is a layer over
the IO monad.
send and received by
calling expect. Messages are ordinary Haskell data; the only
requirement is that the type of the message is an instance of the
Binary and Typeable classes.
There is a certain amount of boilerplate associated with distributed
programming: deriving Binary instances, declaring remotable
functions with remotable, starting up the framework with
distribMain, and so on. Remember that the distributed-process framework is
currently implemented as a library entirely in Haskell. There is no
support for distributed programming built into the language or GHC
itself, and this accounts for some of the boilerplate. As the
framework matures, distributed programming will likely become a
smoother experience.
The previous example showed how to create a process and exchange some simple messages. Now we will extend the program to be truly distributed. Instead of spawning a process on the local node, we will run the program on several nodes, create a process on each one, and perform the ping/pong protocol with all nodes simultaneously.
The Message type and pingServer remain exactly as before. The only
changes will be to the master and main functions. The new
master function is shown below, along with a waitForPongs helper function:
distrib-ping/ping-multi.hs
master::[NodeId]->Process()--![]()
masterpeers=dops<-forMpeers$\nid->do--![]()
say$printf"spawning on %s"(shownid)spawnnid$(mkStaticClosure'pingServer)mypid<-getSelfPidforM_ps$\pid->do--![]()
say$printf"pinging %s"(showpid)sendpid(Pingmypid)waitForPongsps--![]()
say"All pongs successfully received"terminatewaitForPongs::[ProcessId]->Process()--![]()
waitForPongs[]=return()waitForPongsps=dom<-expectcasemofPongp->waitForPongs(filter(/=p)ps)_->say"MASTER received ping">>terminate
This time, the |
|
Spawn a new process on each of the peer nodes, and bind the resulting list of |
|
Call |
|
|
The main function is almost the same as before:
main::IO()main=distribMainmasterMain.__remoteTable
The only difference is that the [Node] argument gets passed along
to master instead of being discarded here.
First, I’ll illustrate starting multiple nodes on the same machine and then progress on to multiple machines.
A distributed program consists of a single master node and one or more slave nodes. The master is the node that begins with a process running; the slave nodes just wait until processes are spawned on them.
Let’s start by creating two slave nodes:
$ ./ping-multi slave 44445 & [3] 58837 $ ./ping-multi slave 44446 & [4] 58847
The ping-multi program takes two command-line arguments; these are
interpreted by the distrbMain function and tell it how to
initialize the framework. The first argument is either master or
slave and indicates which kind of node to create. The second
argument is the TCP port number that this node should use to
communicate on, with the default being 44444.[59] Always use different port numbers
when creating multiple nodes on the same machine.
I used & to create these as background processes in the shell. If
you’re on Windows, just open a few Command Prompt windows and run the
program in each one.
Having started the slaves, we now start the master node:
$ ./ping-multi pid://localhost:44444:0:3: spawning on nid://localhost:44445:0 pid://localhost:44444:0:3: spawning on nid://localhost:44446:0 pid://localhost:44444:0:3: pinging pid://localhost:44445:0:4 pid://localhost:44444:0:3: pinging pid://localhost:44446:0:4 pid://localhost:44446:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44445:0:4: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: All pongs successfully received
The first thing to note is that the master node automatically found
the two slave nodes. The distributed-process-simplelocalnet package
includes a peer discovery mechanism that is designed to
automatically locate and connect to other instances running on the
same machine or other machines on the local network.
It is also possible to restart the master without restarting the
slaves—try invoking ping-multi again, and you should see the same
result. The new master node discovers and reconnects to the
existing slaves.
If we have multiple machines connected on the same network, we can run a distributed Haskell program on them. The first step is to distribute the binary to all the machines; every machine must be running the same binary. A mismatch in the binary on different machines can cause strange failures, such as errors when decoding messages.
Next, we start the slaves as before, but this time we start slaves on the remote machines and pass an extra argument:
$ ./ping-multi slave 192.168.1.100 44444 $ ./ping-multi slave 192.168.1.101 44444
(The above commands are executed on the appropriate machines.) The second argument is new and gives the IP address that identifies the slave. This is the address that the other nodes will use to contact it, so it must be an address that resolves to the correct machine. It doesn’t have to be an IP address, but using IP addresses is simpler and eliminates a potential source of failure (the DNS).
When the slaves are running, we can start the master:
$ ./ping-multi master 44444 pid://localhost:44444:0:3: spawning on nid://192.168.1.100:44444:0 pid://localhost:44444:0:3: spawning on nid://192.168.1.101:44444:0 pid://localhost:44444:0:3: pinging pid://192.168.1.100:44444:0:5 pid://localhost:44444:0:3: pinging pid://192.168.1.101:44444:0:5 pid://192.168.1.100:44444:0:5: ping received from pid://localhost:44444:0:3 pid://192.168.1.101:44444:0:5: ping received from pid://localhost:44444:0:3 pid://localhost:44444:0:3: All pongs successfully received
The program successfully identified the remote nodes, spawned a processes on each one, and exchanged ping-pong messages with the process on each node.
In the examples so far, we saw messages being delivered to a process and the process receiving the messages by using expect. This scheme is quite convenient: we need to know only a process’s ProcessId to
send it messages, and we can send it messages of any type. However,
all the messages for a process go into the same queue, which has a
couple of disadvantages:
expect, the implementation has to search the
queue for a message of the right type, which could be slow.
ProcessId of the sender).
The distributed-process framework provides an alternative means of
message passing based on typed channels, which addresses these two
problems. The interface is as follows:
dataSendPorta-- instance of Typeable, BinarydataReceivePortanewChan::Serializablea=>Process(SendPorta,ReceivePorta)sendChan::Serializablea=>SendPorta->a->Process()receiveChan::Serializablea=>ReceivePorta->Processa
A typed channel consists of two ports, a SendPort and a ReceivePort. Messages are sent to the SendPort by sendChannel and received from the ReceivePort using receiveChannel. As the name suggests, a typed channel can carry messages only of a particular
type.
Typed channels imply a different pattern of interaction. For example, suppose we were making a request to another process and expecting a response. Using typed channels, we could program this as follows:
SendPort.
SendPort it was sent.
In general, the server might make its own channel and send that to the client, and the subsequent interaction would happen over these two channels.
The advantage of creating a channel to carry the response is that the client knows that a message arriving on this channel can only be a response to the original request, and it is not possible to mix up this response with other responses. The channel serves as a link between the original request and the response; we know that it is a response to this particular request, because it arrived on the right channel.
In the absence of typed channels, ensuring that the response can be uniquely identified would involve creating a new identifier to send along with the original message.[60]
Let’s look at how to modify the ping example to use typed channels:
distrib-ping/ping-tc.hs
dataMessage=Ping(SendPortProcessId)deriving(Typeable,Generic)instanceBinaryMessage
Note that we don’t need a Pong message anymore. Instead, the Ping
message will contain a SendPort on which to send the reply, and the
reply is just the ProcessId of the sender. In fact, in this example
we don’t really need to send any content back at all—just sending
() would be enough—but for the purposes of illustration we will
send back the ProcessId.
pingServer::Process()pingServer=doPingchan<-expectsay$printf"ping received from %s"(showchan)mypid<-getSelfPidsendChanchanmypid
master::[NodeId]->Process()masterpeers=dops<-forMpeers$\nid->dosay$printf"spawning on %s"(shownid)spawnnid$(mkStaticClosure'pingServer)mapM_ monitor psports<-forMps$\pid->dosay$printf"pinging %s"(showpid)(sendport,recvport)<-newChan--![]()
sendpid(Pingsendport)--![]()
returnrecvportforM_ports$\port->do--![]()
_<-receiveChanportreturn()say"All pongs successfully received"terminate
Create a new channel to carry the response. |
|
Send the ping message, including the |
|
Where previously we needed a function |
This code is simpler than the previous version in
“Multi-Node Ping”. However, note that we still sent the Ping
messages directly to the process, rather than using a typed
channel. If we wanted to use a typed channel here too, things get more
complicated. We want to do something like this (considering just a
single worker for simplicity):
do(s1,r1)<-newChanspawnnid($(mkClosure`pingServer)r1)(s2,r2)<-newChansendChans1(Pings2)receiveChanr2
This seems quite natural: we create a channel with send port s1 and
receive port r1 on which to send the
Ping message. Then we give the receive port of the channel to the
pingServer process when we spawn it. The code shows how to use spawn
to apply a function (here pingServer) to an argument (here
r1): use mkClosure instead of mkStaticClosure, and then pass the
argument to it (we’ll come back to this later; the details aren’t
important right now).
But there’s a big problem here. ReceivePorts are not Serializable,
which prevents us passing the ReceivePort r1 to the spawned
process. GHC will reject the program with a type error.
Why are ReceivePorts not Serializable? If you think about it a
bit, this makes a lot of sense. If a process were allowed to send a
ReceivePort somewhere else, the implementation would have to
deal with two things: routing messages to the correct destination when
a ReceivePort has been forwarded (possibly multiple times), and
routing messages to multiple destinations, because sending a
ReceivePort would create a new copy. This would introduce a vast
amount of complexity to the implementation, and it is not at all clear
that it is a good feature to allow. So the remote framework
explicitly disallows it, which fortunately can be done using Haskell’s
type system.
This means that we have to jump through an extra hoop to fix the
previous code, though. Instead of passing the ReceivePort to the
spawned process, the spawned process must create the channel and send
us back the SendPort. This means we need another channel so
that the spawned process can send us back its SendPort.
do(s,r)<-newChan-- throw-away channelspawnnid($(mkClosure`pingServer)s)ping<-receiveChanr(sendpong,recvpong)<-newChansendChanping(Pingsendpong)receiveChanrecvpong
Since this extra handshake is a bit of a hassle, you might well prefer
to send messages directly to the spawned process using send rather
than using typed channels, which is exactly what the example code at the
beginning of this section did.
In the previous section, we waited for a response from each child process
in turn, whereas the old waitForPongs version processed the messages
in the order they arrived. In this case it isn’t a problem, but
suppose some of these messages required a response. Then we might
have introduced some extra latency: if a process toward the end of
the list replies early, it won’t get a response until the
master process has dealt with the messages from the other processes
earlier in the list, some of which might take a while to reply.
So we need a way to wait for messages from multiple channels
simultaneously. The distributed-process framework has an elegant
way to do this. Channels can be merged together to make a single
channel that receives messages from any of the original channels.
There are two ways to do this:
mergePortsBiased::Serializablea=>[ReceivePorta]->Process(ReceivePorta)mergePortsRR::Serializablea=>[ReceivePorta]->Process(ReceivePorta)
The difference is in the order in which messages arrive on the merged
channel. In mergePortsBiased, each receive searches the
ports in left-to-right order for a message, returning the first
message it finds. The alternative is mergePortsRR (the RR stands
for "round robin") which also searches left to right, but rotates the list by one element after each receive, with the leftmost port moving to the end of the list.
One important thing to note is that merging channels does not affect the original channel; we can still receive messages from either source, and indeed there is no problem with merging multiple overlapping sets of channels.[61]
Here is the ping example with channels, where instead of waiting for the responses one by one, we merge the channels together and wait for all the responses simultaneously.
distrib-ping/ping-tc-merge.hs
master::[NodeId]->Process()masterpeers=dops<-forMpeers$\nid->dosay$printf"spawning on %s"(shownid)spawnnid$(mkStaticClosure'pingServer)ports<-forMps$\pid->dosay$printf"pinging %s"(showpid)(sendport,recvport)<-newChansendpid(Pingsendport)returnrecvportoneport<-mergePortsBiasedports--![]()
waitForPongsoneportps--![]()
say"All pongs successfully received"terminatewaitForPongs::ReceivePortProcessId->[ProcessId]->Process()waitForPongs_[]=return()waitForPongsportps=dopid<-receiveChanportwaitForPongsport(filter(/=pid)ps)
One of the important benefits provided by the distributed-process framework is handling and recovering
from failure. Failure is a fact of life in distributed computing, and we should be prepared for the possibility that any of our processes might fail at any time, whether due to network outage, a hardware crash, or software faults.
Here is a basic example showing how the failure of one process can be
caught and acted upon by another process. In the original ping
example from “Defining a Message Type”, recall that the Message
type has two constructors:
dataMessage=PingProcessId|PongProcessId
and the code for pingServer matches explicitly on the
Ping constructor:
distrib-ping/ping-fail.hs
pingServer::Process()pingServer=doPingfrom<-expectsay$printf"ping received from %s"(showfrom)mypid<-getSelfPidsendfrom(Pongmypid)
What will happen if the message is a Pong, rather than a
Ping? Both messages have the type Message, so expect cannot
distinguish them; if the context requires a message of type Message, expect
can return either a Ping or a Pong. Clearly, if
expect returns a Pong here, then the pattern match against Ping will fail,
and as usual in Haskell this throws an
exception. Since there are no exception handlers, the
exception will result in the termination of the pingServer process.
There are ways to prevent the error, of course, but for now let’s see
how we can catch this failure from another process. We’ll use
withMonitor, which has the following signature:
withMonitor::ProcessId->Processa->Processa
withMonitor takes a ProcessId to monitor and an action to perform. During the action, if the specified process fails in any way, a special message of type ProcessMonitorNotification is sent to the current process.
To wait for either the ProcessMonitorNotification message or a Pong, we need to know how to wait for different types of message at the same time. The basic pattern for this is as follows:
receiveWait[match$\p->do...,match$\q->do...]
where p and q are patterns that match different types of
message. The types of these functions are shown here:
receiveWait::[Matchb]->ProcessbreceiveTimeout::Int->[Matchb]->Process(Maybeb)match::Serializablea=>(a->Processb)->MatchbmatchIf::Serializablea=>(a->Bool)->(a->Processb)->Matchb
The function receiveWait waits until any of the match functions
applies to a message in the queue, and then executes the associated
action. The receiveTimeout operation is similar, but instead of
waiting indefinitely for a matching message, it takes a time in
milliseconds and returns Nothing if a matching message did not
arrive before the time.
Here is how we monitor the pingServer process and then wait for
either a Pong message or a ProcessMonitorNotification:
distrib-ping/ping-fail.hs
withMonitorpid$dosendpid(Pongmypid)--![]()
receiveWait[match$\(Pong_)->dosay"pong."terminate,match$\(ProcessMonitorNotification_refdeadpidreason)->dosay(printf"process %s died: %s"(showdeadpid)(showreason))terminate]
Note that we deliberately send the child a Pong message (
) to cause it to fail. Running the program
results in this:
pid://localhost:44444:0:3: spawning on nid://localhost:44444:0
pid://localhost:44444:0:3: sending ping to pid://localhost:44444:0:4
pid://localhost:44444:0:3: process pid://localhost:44444:0:4 died:
DiedException "user error (Pattern match failure in do expression at
distrib-ping/ping-fail.hs:24:3-11)"
The third log message indicates that the master received the notification of the failed process, and gives the details of the failure: a pattern-match error, as we expected.
It is worth asking whether having a single Message data type for our
messages was a good idea in the first place. Perhaps we should have
made separate types, as in:
newtypePong=PongProcessIdnewtypePing=PingProcessId
The choice comes down to whether we are using typed channels or not.
With typed channels, we could use only a single message type, whereas
using the per-process dynamically typed channel with send and
expect or receiveWait, we could use multiple message types. Having
one type for each message would avoid the possibility of a pattern-match
failure when matching on a message, but unless we also have a
catch-all case to match unrecognized messages, the other messages
could be left in the queue forever, which could amount to an
undetected error or deadlock. So there might well be cases where we
want to match both messages because one is definitely an
error, and so using a single message type would help ensure that we
always match on all the possible messages.
The more appropriate choice depends on the particular circumstances in your application.
A summary of the API for process monitoring follows:
monitor::ProcessId->ProcessMonitorRefunmonitor::MonitorRef->Process()withMonitor::ProcessId->Processa->ProcessadataProcessMonitorNotification=ProcessMonitorNotificationMonitorRefProcessIdDiedReasondataMonitorRef-- abstractdataDiedReason=DiedNormal-- Normal termination|DiedException!String-- The process exited with an exception|DiedDisconnect-- We got disconnected from the process node|DiedNodeDown-- The process node died|DiedUnknownId-- Invalid (process/node/channel) identifier
In addition to the withMonitor function mentioned earlier, a process
can also be monitored by calling the monitor function. This
function returns a token of type MonitorRef, which can be passed to
unmonitor to stop monitoring the process again. In general, it is
better to use withMonitor than the monitor and unmonitor pair if
possible, because withMonitor will automatically stop monitoring the
remote process in the event of an exception. However, sometimes
withMonitor doesn’t fit the control flow, which is when monitor and
unmonitor are useful.
In a distributed system, parts of the running program may fail at any time due to circumstances beyond our control. Such a failure typically results in one or more of the processes in our network becoming disconnected without warning; there is no exception and no opportunity to clean up whatever it was doing. Perhaps the hardware it was running on failed, or the network on which we were communicating with it stopped working.
A far-reaching approach for such failures can be seen in Erlang, a programming language with distributed programming at its heart. The only mechanism for communication is message passing, so every concurrent Erlang program is fundamentally distributable. The Erlang designers promote a particular philosophy for dealing with failure, often known by its catchphrase: "Let it crash." The basic principle is that since in a distributed system we must already be prepared for a process to simply disappear, we might as well deal with all kinds of failure in this way because doing so makes failure handling much simpler. And since failure handling is difficult to test, making it simpler is highly desirable.
Concretely, instead of trying to enumerate local failure conditions and handle them in some way, we can just let them propagate to the top of the process and let the process die. The distributed program must be prepared for this eventuality already (since this is a distributed system), so the system will recover in some way: perhaps by restarting the failed process in some known-good state and logging the failure somewhere.
Thus the granularity at which we have to consider failure is the process, and we can design our applications such that individual processes can fail without catastrophic consequences. A process will probably have some internal state that is lost when it dies, but the parent should know how to construct the initial state to restart the process or to propagate the failure to a higher layer that can.
In “A Chat Server”, we built a multithreaded chat server using Concurrent Haskell and STM. In this section, we will extend the chat server to be distributed. The server will be running across multiple machines, clients may connect to any of the machines, and any client will be able to chat with any other client connected via any of the servers. Essentially, the distributed chat server will behave just like the single-threaded server (minus some subtle differences that we will discuss shortly), except that clients have a choice of machines to connect to.
A distributed chat network saves bandwidth. For example, suppose we set up a chat network with two servers A and B on each side of the Atlantic Ocean. Each server has a large number of clients connected, with each client connecting to its closest server. When a client on server A broadcasts a message, it needs to be sent across the trans-Atlantic link to server B only once, and server B then forwards it to each of its connected clients. The broadcast message crosses the Atlantic only once, instead of once for each of the clients on the other side.
We have already written all the code for the multithreaded server, so
it seems a shame to throw it away and rewrite it all to use
distributed-process instead. Fortunately, we don’t have to do that. We can simply add some extra code to handle distribution, using the original server code nearly intact. Each client will still be managed
by ordinary IO threads synchronized using STM, but additionally we
will have some code communicating with the other servers using
distributed-process. In Haskell, distributed programming is not
all or nothing. We can freely mix distributed and concurrent
programming in the same program. This means we can take advantage of
the simplicity and performance of ordinary concurrent programming on
each node, while using the heavier-weight distributed interfaces for
the parts of the program that need to work across multiple nodes.
In this first version, we will use a master/slave configuration in which the master will start up server instances on all the slaves once at the beginning. Later, we will consider how to modify the program so that all nodes are equal, and nodes may come and go at arbitrary times.
We will need a few changes to the data structures compared with the multithreaded server. When one client sends a message to another client connected to a different server, we need to know where to send the message. So each server will need to keep a list of all the clients connected to any server in the network, along with the server to which the client is connected. The information about a client now has two possibilities: either it is a local client (connected to this server), or a remote client (connected to a different server).
distrib-chat/chat.hs
typeClientName=StringdataClient=ClientLocalLocalClient|ClientRemoteRemoteClientdataRemoteClient=RemoteClient{remoteName::ClientName,clientHome::ProcessId}dataLocalClient=LocalClient{localName::ClientName,clientHandle::Handle,clientKicked::TVar(MaybeString),clientSendChan::TChanMessage}clientName::Client->ClientNameclientName(ClientLocalc)=localNamecclientName(ClientRemotec)=remoteNamecnewLocalClient::ClientName->Handle->STMLocalClientnewLocalClientnamehandle=doc<-newTChank<-newTVarNothingreturnLocalClient{localName=name,clientHandle=handle,clientSendChan=c,clientKicked=k}
LocalClient is what we previously called Client, and
RemoteClient is a client connected to another server. The Client
type is now a disjunction of these two, with constructors
ClientLocal and ClientRemote.
The Message type is as before, except that we need to derive
Typeable and Binary, because Messages will be sent over the
network:
dataMessage=NoticeString|TellClientNameString|BroadcastClientNameString|CommandStringderiving(Typeable,Generic)instanceBinaryMessage
Servers need to communicate with one another, and the kinds of messages
they need to send are richer than Message. For example, servers need
to tell one another when a new client connects, or one client kicks
another. So we have a new type for messages sent between servers,
which we call PMessage:
dataPMessage=MsgServers[ProcessId]|MsgSendClientNameMessage|MsgBroadcastMessage|MsgKickClientNameClientName|MsgNewClientClientNameProcessId|MsgClientDisconnectedClientNameProcessIdderiving(Typeable,Generic)instanceBinaryPMessage
Most of these are self-explanatory, except for one: MsgServers is a
special message sent to each server node when it starts up, telling it
the ProcessIds of all the server nodes in the network.
The Server type previously contained only the mapping from
ClientName to Client, but now it needs some more information:
dataServer=Server{clients::TVar(MapClientNameClient),proxychan::TChan(Process()),servers::TVar[ProcessId],spid::ProcessId}newServer::[ProcessId]->ProcessServernewServerpids=dopid<-getSelfPidliftIO$dos<-newTVarIOpidsc<-newTVarIOMap.emptyo<-newTChanIOreturnServer{clients=c,servers=s,proxychan=o,spid=pid}
clients is the client mapping, as before; servers is the list of
other server ProcessIds, and spid is the ProcessId of this
server (for convenience).
The proxychan field pertains to an added bit of complexity in our
distributed architecture. Remember that we are
leaving as much of the existing server infrastructure intact as
possible; that means the existing server threads are ordinary
forkIO threads. A forkIO thread cannot perform operations in the
Process monad, yet we certainly need to be able to do that somehow
because certain actions by a client must trigger communication with
other servers in the network. So the trick we use is a proxy, which is a
process that reads actions from a TChan and performs them in the
Process monad. To have a Process action performed from an IO
thread, we simply queue it on the proxy TChan. Each server has a
single proxy channel, created when the server starts up and stored in
the proxychan field of Server.
Next, we need a few small utilities. First, a way to send a Message
to a LocalClient:
sendLocal::LocalClient->Message->STM()sendLocalLocalClient{..}msg=writeTChanclientSendChanmsg
The following function, sendRemote, sends a PMessage to a remote
server. To do this, it needs to use the proxychan (which it gets
from the Server) and it needs the pid of the destination process:
sendRemote::Server->ProcessId->PMessage->STM()sendRemoteServer{..}pidpmsg=writeTChanproxychan(sendpidpmsg)
Now that we can send both local and remote messages, we can define
sendMessage, which sends a Message to any client:
sendMessage::Server->Client->Message->STM()sendMessageserver(ClientLocalclient)msg=sendLocalclientmsgsendMessageserver(ClientRemoteclient)msg=sendRemoteserver(clientHomeclient)(MsgSend(remoteNameclient)msg)
A variant sends a message to a named client or returns False if the
client is not connected:
sendToName::Server->ClientName->Message->STMBoolsendToNameserver@Server{..}namemsg=doclientmap<-readTVarclientscaseMap.lookupnameclientmapofNothing->returnFalseJustclient->sendMessageserverclientmsg>>returnTrue
Next, we consider broadcasting messages. First, we need a way to send
a PMessage to all the connected servers:
sendRemoteAll::Server->PMessage->STM()sendRemoteAllserver@Server{..}pmsg=dopids<-readTVarserversmapM_(\pid->sendRemoteserverpidpmsg)pids
We also need a broadcastLocal function that sends a message to the local
clients only:
broadcastLocal::Server->Message->STM()broadcastLocalserver@Server{..}msg=doclientmap<-readTVarclientsmapM_sendIfLocal(Map.elemsclientmap)wheresendIfLocal(ClientLocalc)=sendLocalcmsgsendIfLocal(ClientRemote_)=return()
This function works by calling an auxiliary function sendIfLocal on
each of the clients, which calls sendLocal if the client is local
and does nothing if the client is remote.
Putting sendRemoteAll and broadcastLocal together, we can
broadcast a Message to everyone:
broadcast::Server->Message->STM()broadcastserver@Server{..}msg=dosendRemoteAllserver(MsgBroadcastmsg)broadcastLocalservermsg
The rest of the local server code is almost identical to that in
“A Chat Server”, so we don’t reproduce it here. The only important
differences are that we need to inform other servers whenever a client
connects or disconnects by calling sendRemoteAll with a
MsgNewClient or MsgClientDisconnected respectively.
The interesting part is how we handle distribution. Previously, the
main function was responsible for setting up the network socket and
accepting new connections. This is now delegated to a function
socketListener, which is otherwise identical to the previous main:
socketListener::Server->Int->IO()socketListenerserverport=withSocketsDo$dosock<-listenOn(PortNumber(fromIntegralport))printf"Listening on port %d\n"portforever$do(handle,host,port)<-acceptsockprintf"Accepted connection from %s: %s\n"host(showport)forkFinally(talkserverhandle)(\_->hClosehandle)
We need a function to implement the proxy, described above in
“Sending Messages”. All it does is repeatedly read Process
() values from the proxychan and execute them:
proxy::Server->Process()proxyServer{..}=forever$join$liftIO$atomically$readTChanproxychan
Now, the chatServer function is the main Process () action that
implements a chat server:
chatServer::Int->Process()chatServerport=doserver<-newServer[]liftIO$forkIO(socketListenerserverport)--![]()
spawnLocal(proxyserver)--![]()
forever$dom<-expect;handleRemoteMessageserverm--
Starts up the |
|
Creates the |
|
Repeatedly grabs the next message and calls |
handleRemoteMessage::Server->PMessage->Process()handleRemoteMessageserver@Server{..}m=liftIO$atomically$casemofMsgServerspids->writeTVarservers(filter(/=spid)pids)--![]()
MsgSendnamemsg->void$sendToNameservernamemsg--![]()
MsgBroadcastmsg->broadcastLocalservermsg--![]()
MsgKickwhoby->kickserverwhoby--![]()
MsgNewClientnamepid->do--![]()
ok<-checkAddClientserver(ClientRemote(RemoteClientnamepid))when(notok)$sendRemoteserverpid(MsgKickname"SYSTEM")MsgClientDisconnectednamepid->do--![]()
clientmap<-readTVarclientscaseMap.lookupnameclientmapofNothing->return()Just(ClientRemote(RemoteClient_pid'))|pid==pid'->deleteClientservernameJust_->return()
The special |
|
|
|
|
|
|
Now that the server code is in place, we just need to write the
code to start up the whole distributed network. The main function
invokes master on the master node:
port::Intport=44444master::[NodeId]->Process()masterpeers=doletrunnidport=dosay$printf"spawning on %s"(shownid)spawnnid($(mkClosure'chatServer)port)pids<-zipWithM run peers[port+1..]mypid<-getSelfPidlet all_pids=mypid:pidsmapM_(\pid<-send pid(MsgServers))all_pidschatServer portmain=distribMain masterMain.__remoteTable
The master function is fairly straightforward. It spawns
chatServer on each of the slaves, using increasing port numbers, and then sends a MsgServers message to each server process containing a list of all the server ProcessIds.[62]
We can start up a few nodes on a single machine like so:
$ ./chat slave 55551 & ./chat slave 55552 & ./chat master 55553 pid://localhost:55553:0:3: spawning on nid://localhost:55552:0 pid://localhost:55553:0:3: spawning on nid://localhost:55551:0 Listening on port 44444 Listening on port 44445 Listening on port 44446
(Remember the port numbers given on the command line are the ports
used by the distributed-process framework; the ports that the chat
server listens to are hardcoded to 44444, 44445, …)
Then connect to one of the nodes:
$ nc localhost 44445 What is your name? Fred *** Fred has connected
And connect to a different node:
$ nc localhost 44446 What is your name? Bob *** Bob has connected hi <Bob>: hi
We should now see the new activity on the first connection:
*** Bob has connected <Bob>: hi
Our distributed server works only with a fixed set of nodes, which makes it quite limited. In practice, we want to be able to add and remove nodes from the network at will. Nodes will disconnect due to network and hardware outages, and we would like to be able to add new nodes without restarting the entire network.
My sketch implementation can be found in distrib-chat/chat-noslave.hs, but you might want to try implementing this for yourself. Some hints on how to go about it follow.
We need to abandon the master/slave architecture; every node will be
equal. Instead of using our DistribUtils module, we can use the
following sequence to initialize the simplelocalnet backend and
start up a node:
distrib-chat/chat-noslave.hs
main=do[port,chat_port]<-getArgsbackend<-initializeBackend"localhost"port(Main.__remoteTableinitRemoteTable)node<-newLocalNodebackendNode.runProcessnode(masterbackendchat_port)
Now the function master has type Backend -> String -> Process ()
and runs on every node. The outline of the rest of the
implementation is as follows:
When a node starts up, it calls findPeers to get the other nodes
in the network.
findPeers::Backend->Int{- timeout -}->IO[NodeId]
It registers the current process as "chatServer" on the local node using the register function:
register::String->ProcessId->Process()
Next we call whereisRemoteAsync for each of the other nodes,
asking for the ProcessId of "chatServer".
whereisRemoteAsync::NodeId->String->Process()
The remote node will respond with a WhereIsReply:
dataWhereIsReply=WhereIsReplyString(MaybeProcessId)
We won’t wait for the reply immediately; it will be received along with other messages in the main message loop.
chatServer as before, but now we need to also
handle WhereIsReply messages. When one of these messages is
received, if it indicates that we found a "chatServer" process on
another node, then we move on to the next step.
ProcessId a message to tell it that we have joined the
network. This is a new PMessage that we call MsgServerInfo. It contains the current ProcessId and the list of local clients
we have (because clients may have already connected by now).
MsgServerInfo, add that ProcessId to the servers list if it isn’t already there.
MsgServerInfo of our own to tell the other server which
local clients are on this server.
A key-value store is a simple database that supports only operations to store and retrieve values associated with keys. Key-value stores have become popular over recent years because they offer scalability advantages over traditional relational databases in exchange for supporting fewer operations (in particular, they lack database joins).
This exercise is to use the distributed-process framework to
implement a distributed fault-tolerant key-value store (albeit a
very simplistic one).
The interface exposed to clients is the following:
typeDatabasetypeKey=StringtypeValue=StringcreateDB::ProcessDatabaseset::Database->Key->Value->Process()get::Database->Key->Process(MaybeValue)
Here, createDB creates a database, and set and get perform
operations on it. The set operation sets the given key to the given
value, and get returns the current value associated with the given
key or Nothing if the key has no entry.
Part 1. In distrib-db/db.hs, I supplied a sample main function that
acts as a client for the database, and you can use this to test your
database. The skeleton for the database code itself is in
Database.hs in the same directory. The first exercise is to
implement a single-node database by modifying Database.hs. That is:
createDB should spawn a process to act as the database. It
can spawn on the current node.
get and set should talk to the database process via
messages; you need to define the message type and the operations.
When you run db.hs, it will call createDB to create a database
and then populate it using the Database.hs source file itself. Every
word in the file is a key that maps to the word after it. The client
will then look up a couple of keys and then go into an interactive
mode where you can type in keys that are looked up in the database.
Try it out with your database implementation and satisfy yourself
that it is working.
Part 2. The second stage is to make the database distributed. In practice, the reason for doing this is to store a database much larger than we can store on a single machine and still have fast access to all of it.
The basic plan is that we are going to divide up the key space uniformly and store each portion of the key space on a separate node. The exact method used for splitting up the key space is important in practice because if you get it wrong, then the load might not be well-balanced between the nodes. For the purposes of this exercise, though, a simple scheme will do: take the first character of the key modulo the number of workers.
There will still be a single process handling requests from clients,
so we still have type Database = ProcessId. However, this process
needs to delegate requests to the correct worker process according to
the key:
createDB.
Worker) due to restrictions
imposed by Template Haskell. The worker process needs to maintain its
own Map and handle get and set requests.
Compile db.hs against your distributed database to make sure it still works.
Part 3. Make the main database process monitor all the worker processes.
Detect failure of a worker and emit a message using say. You will
need to use receiveWait to wait for multiple types of messages; see
the ping-fail.hs example for hints.
Note that we can’t yet do anything sensible if a worker dies. That is the next part of the exercise.
Part 4. Implement fault tolerance by replicating the database across multiple nodes.
get).
This should result in a distributed key-value store that is robust to individual nodes going down, as long as we don’t kill too many nodes too close together. Try it out—kill a node while the database is running and check that you can still look up keys.
A sample solution can be found in distrib-db/DatabaseSample.hs and distrib-db/WorkerSample.hs.
[50] Also known as “Cloud Haskell.”
[51] This is also known as the actor model.
[52] The distributed-process package is in fact the second implementation of these ideas, the first prototype being the remote package.
[53] For example, meta-par and
HdpH.
[54] As of binary version 0.6.3.0.
[55] As of GHC version 7.2.1.
[56] We expect that in the future, GHC will provide syntactic sugar to make remote code execution easier.
[57] Template Haskell is a feature provided by GHC that allows Haskell code to be manipulated and generated at compile time. For more details, see the GHC User’s Guide.
[58] The log messages produced by say are normally prefixed by a timestamp, but I have omitted the timestamps here for clarity.
[59] The default
port is chosen by our distribMain wrapper, not the
distributed-process framework.
[60] Indeed, some of Erlang’s libraries use exactly this technique.
[61] The current implementation of channels uses STM, and channels are merged using orElse.
[62] This is mainly so that we can test the server on a single machine; in practice, you would want to choose the port number via a command-line option or some other method.
[63] A real fault-tolerant database would restart the worker on a new node and copy the database slice from its partner. The solution provided in this book doesn’t do this, but by all means have a go at doing it.