Providence Salumu
Server-type applications that communicate with many clients simultaneously demand both a high degree of concurrency and high performance from the I/O subsystem. A good web server should be able to handle hundreds of thousands of concurrent connections and service tens of thousands of requests per second.
Ideally, we would like to write these kinds of applications using threads. A thread is the right abstraction. It allows the developer to focus on programming the interaction with a single client and then to lift this interaction to multiple clients by simply forking many instances of the single-client interaction in separate threads. In this chapter, we explore this idea by developing a series of server applications, starting from a trivial server with no interaction between clients, then adding some shared state, and finally building a chat server with state and inter-client interaction.
Along the way, we will need to draw on many of the concepts from
previous chapters. We’ll discuss the design of the server using both
MVar and STM, how to handle failure, and building groups of threads
using the abstractions introduced in “Symmetric Concurrency Combinators”.
In this section, we will consider how to build a simple network server with the following behavior:
"end", then the server closes the
connection.
First, we program the interaction with a single client. The function
talk defined below takes a Handle for communicating with the
client. The Handle will be bound to a network socket so that data
sent by the client can be read from the Handle, and data written to
the Handle will be sent to the client.
server.hs
talk::Handle->IO()talkh=dohSetBufferinghLineBuffering--![]()
loop--![]()
whereloop=doline<-hGetLineh--![]()
ifline=="end"--![]()
thenhPutStrLnh("Thank you for using the "++--![]()
"Haskell doubling service.")elsedohPutStrLnh(show(2*(readline::Integer)))--![]()
loop--
Having dealt with the interaction with a single client, we can now
make this into a multiclient server using concurrency. The main
function for our server is as follows:
main=withSocketsDo$dosock<-listenOn(PortNumber(fromIntegralport))--![]()
printf"Listening on port %d\n"portforever$do--![]()
(handle,host,port)<-acceptsock--![]()
printf"Accepted connection from %s: %s\n"host(showport)forkFinally(talkhandle)(\_->hClosehandle)--![]()
port::Intport=44444
First, we create a network socket to listen on port 44444. |
|
Then we enter a loop to accept connections from clients. |
|
This line waits for a new client connection. The |
|
Next, we call |
Having forked a thread to handle this client, the main thread then goes back to accepting more connections. All the active client connections and the main thread run concurrently with each other, so the fact that the server is handling multiple clients will be invisible to any individual client.
So making our concurrent server was simple—we did not have to change the single-client code at all, and the code to lift it to a concurrent server was only a handful of lines. We can verify that it works by starting the server in one window:
$ ./server
In another window, we start a client and try a single request. We send 22 and get 44 in return.[44]
$ nc localhost 44444 22 44
Next, we leave this client running and start another client:
$ ghc -e 'mapM_ print [1..]' | nc localhost 44444 2 4 6 ...
This client exercises the server a bit more by sending it a continuous stream of numbers to double. For fun, try starting a few of these. Meanwhile we can switch back to our first client and observe that it is still being serviced:
$ nc localhost 44444 22 44 33 66
Finally, we can end a single client’s interaction by typing
end:
end Thank you for using the Haskell doubling service.
This was just a simple example, but the same ideas underlie several
high-performance web server implementations in Haskell. Furthermore,
with no additional effort at all, the same server code can make use of
multiple cores simply by compiling with -threaded and running with
+RTS -N.
There are two technologies that make this structure feasible in Haskell:
epoll on Linux. Thus applications with
lots of lightweight threads, all doing I/O simultaneously,
perform very well.
Were it not for lightweight threads and the I/O manager, we would have
to resort to collapsing the structure into a single event loop (or
worse, multiple event loops to take advantage of multiple cores). The
event loop style loses the single-client abstraction. Instead, all
clients have to be dealt with simultaneously, which can be complicated
if there are different kinds of clients with different behaviors.
Furthermore, we have to represent the state of each client somehow,
rather than just writing the straight-line code as we did in talk earlier. Imagine extending talk to implement a more elaborate
protocol with several states—it would be reasonably straightforward
with the single-client abstraction, but if we had to represent each
state and the transitions explicitly, things would quickly get
complicated.
We ignored many details that would be necessary in a real server application. The reader is encouraged to think about these and try implementing any required changes on top of the provided sample code:
Interrupted exception to the main thread.)
talk if the line does not parse as a number?
Next, we’ll extend the simple server from the previous section to include some state that is shared amongst the clients and may be changed by client actions.
The new behavior is as follows: instead of multiplying each number by
two, the server will multiply each number by the current factor.
Any connected client can change the current factor by sending the
command *, where
N is an integer. When a client
changes the factor, the server sends a message to all the other
connected clients informing them of the change.N
While this seems like a small change in behavior, it introduces some interesting new challenges in designing the server.
Let’s explore the design space, taking as a given that we want to serve each client from a separate thread on the server. Over the following sections, I’ll outline four possible designs and explain the pros and cons of each one.
This is the simplest approach. The state of the server is stored
under a single MVar and looks something like this:
dataState=State{currentFactor::Int,clientHandles::[Handle]}newtypeStateVar=StateVar(MVarState)
Note that the state contains all the Handles of the connected
clients. This is so that if a server thread receives a factor-change
command from its client, it can notify all the other clients of the
change by writing a message to their Handle.
However, we have to be careful. If multiple threads write to a
Handle simultaneously, the messages might get interleaved in an
arbitrary way. To make sure messages don’t get interleaved, we
can use the MVar as a lock. But this means that every server
thread, when it needs to send a message to its client, must hold the
MVar while sending the message.
Clearly, the disadvantage of this model is that there will be lots of
contention for the shared MVar, since even when clients are not
interacting with each other, they still have to take the lock. This
design does not have enough concurrency.
Note that we can’t reduce contention by using finer-grained locking here because the combination of modifying the state and informing all the clients must be atomic. Otherwise, the notifications created by multiple factor-change commands could interleave with one another and clients may end up being misled about the current factor value.
To add more concurrency, we want to design the system so that each
server thread can communicate with its client privately without
interacting with the other server threads. Therefore, the Handle for
communicating with the client must be private to each server thread.
The factor-change command still has to notify all the clients, but
since the server thread is the only thread allowed to communicate with
a client, we must send a message to all the server threads when a
factor-change occurs. Therefore, each server thread must have a Chan
on which it receives messages.
The types in this setup would look like this:
dataState=State{clientChans::[ChanMessage]}dataMessage=FactorChangeInt|ClientInputStringnewtypeStateVar=StateVar(MVarState)
There are two kinds of events that a server thread can act upon:
a factor-change event from another server thread or a line of
input from the client. Therefore, we make a Message type to combine
these two events so that the Chan can carry either. How do the
ClientInput events get generated? We need another thread for each
server thread whose sole job it is to receive lines of input from the
client’s Handle and forward them to the Chan in the form of
ClientInput events. I’ll call this the "receive thread."
This design is an improvement over the first design, although it does
still have one drawback. A server thread that receives a factor-change
command must iterate over the whole list of Chans sending a
message to each one, and this must be done with the lock held, again
for atomicity reasons. Furthermore, we have to keep the list of
Chans up to date when clients connect and disconnect.
To solve the issue that notifying all the clients requires a possibly
expensive walk over the list of Chans, we can use a broadcast
channel instead, where a broadcast channel is an ordinary Chan that
we create a copy of for each server thread using dupChan (see
“MVar as a Building Block: Unbounded Channels”). When an item is written to the broadcast channel,
it will appear on all the copies.
So in this design, the only shared state we need is a single broadcast
channel, which doesn’t even need to be stored in an MVar (because it
never changes). The messages sent on the broadcast channel are new
factor values. Because all server threads will see messages on this
channel in the same order, they all have a consistent view of the state.
newtypeState=State{broadcastChan::ChanInt}
However, there is one wrinkle with this design. The server thread
must listen both for events on the broadcast channel and for input from
the client. To merge these two kinds of events, we’ll need a Chan as
in the previous design, a receive thread to forward the client’s
input, and another thread to forward messages from the broadcast
channel. Hence this design needs a total of three threads per client.
The setup is summarized by the diagram in Figure 12-1.

We can improve on the previous design further by using STM. With STM,
we can avoid the broadcast channel by storing the current factor in a
single shared TVar:
newtypeState=State{currentFactor::TVarInt}
An STM transaction can watch for changes in the TVar's value using
the technique that we saw in “Blocking Until Something Changes”, so we
don’t need to explicitly send messages when it changes.
Furthermore, as we saw in “Merging with STM”, we can merge multiple sources of events in STM without using extra threads. We do need a receive thread to forward input from the client because an STM transaction can’t wait for IO, but that’s all. This design needs two threads per client. The overall structure is depicted in Figure 12-2.

For concreteness, let’s walk through the sequence of events that take
place in this setup when a client issues a
* command:N
*N
command from the Handle, and forwards it to the server thread’s
TChan.
TChan and modifies
the shared TVar containing the current factor.
TVar is noticed by the other server
threads, which all report the new value to their respective clients.
STM results in the simplest architecture, so we’ll develop our
solution using that. First, the main function, which has a couple
of changes compared with the previous version:
server2.hs
main=withSocketsDo$dosock<-listenOn(PortNumber(fromIntegralport))printf"Listening on port %d\n"portfactor<-atomically$newTVar2--![]()
forever$do(handle,host,port)<-acceptsockprintf"Accepted connection from %s: %s\n"host(showport)forkFinally(talkhandlefactor)(\_->hClosehandle)--![]()
port::Intport=44444
Here, we create the |
|
The |
The talk function sets up the threads to handle the new client connection:
talk::Handle->TVarInteger->IO()talkhfactor=dohSetBufferinghLineBufferingc<-atomicallynewTChan--![]()
race(serverhfactorc)(receivehc)--![]()
return()
Creates the new |
|
Creates the |
The receive function repeatedly reads a line from the Handle and
writes it to the TChan:
receive::Handle->TChanString->IO()receivehc=forever$doline<-hGetLinehatomically$writeTChancline
Next, we have the server thread, where most of the application logic
resides.
server::Handle->TVarInteger->TChanString->IO()serverhfactorc=dof<-atomically$readTVarfactor--![]()
hPrintfh"Current factor: %d\n"f--![]()
loopf--![]()
whereloopf=doaction<-atomically$do--![]()
f'<-readTVarfactor--![]()
if(f/=f')--![]()
thenreturn(newfactorf')--![]()
elsedol<-readTChanc--![]()
return(commandfl)--![]()
actionnewfactorf=do--![]()
hPrintfh"new factor: %d\n"floopfcommandfs--![]()
=casesof"end"->hPutStrLnh("Thank you for using the "++"Haskell doubling service.")--![]()
'*':s->doatomically$writeTVarfactor(reads::Integer)--![]()
loopfline->dohPutStrLnh(show(f*(readline::Integer)))loopf
Read the current value of the factor. |
|
Report the current factor value to the client. |
|
Then we enter the |
|
The overall structure is as follows: |
|
In the transaction, first we read the current factor. |
|
Next, we compare it against the value we previously read, in |
|
If the two are different, indicating that the factor has been
changed, then we call the |
|
|
If the factor has not been changed, we read from the
(Convince yourself that the two versions do the same thing, and also
consider why it isn’t possible to always transform away an
|
|
Having read a line of input from the |
|
The |
|
The |
|
If the client said |
|
If the client requests a change in factor, then we update the
global factor value and call |
Try this server yourself by compiling and running the server2.hs
program. Start up a few clients with the nc program (or another
suitable telnet-style application) and check that it is working as
expected. Test the error handling: what happens when you close the
client connection without sending the end command, or if you send a
non-number? You might want to add some additional debugging output to
various parts of the program in order to track more clearly what is happening.
Continuing on from the simple server examples in the previous sections, we now consider a more realistic example: a network chat server. A chat server enables multiple clients to connect and type messages to one another interactively. Real chat servers (e.g., IRC) have multiple channels and allow clients to choose which channels to participate in. For simplicity, we will be building a chat server that has a single channel, whereby every message is seen by every client.
The informal specification for the server is as follows:
Each line received from the client is interpreted as a command, which is one of the following:
/tell name message
message to the user name.
/kick name
name.[46]
/quit
message
/) is broadcast as a
message to all the connected clients.
As in the factor example of the previous section, the requirements
dictate that a server thread must act on events from multiple sources:
input from the client over the network, /tell messages and
broadcasts from other clients, being kicked by another client, and
clients connecting or disconnecting,
The basic architecture will be similar. We need a receive thread to
forward the network input into a TChan and a server thread to wait
for the different kinds of events and act upon them. Compared to the
previous example, though, we have a lot more shared state. A client
needs to be able to send messages to any other client, so the set of
clients and their corresponding TChans must be shared.
We should consider how to handle /kick because we want to guarantee
that two clients cannot simultaneously kick each other. This implies
some synchronized, shared state for each client to
indicate whether it has been kicked. A server thread can then check
that it has not already been kicked itself before kicking another
client. To inform the victim that it has been kicked, we could send a
message to its TChan, but because we are using STM, we might as well
just watch the global state for changes as we did in the factor
example in the previous section.
Next, we need to consider how the various events (apart from /kick)
arrive at the server thread. There is input from the client over the
network and also messages from other clients to be sent back to this
client. We could use separate TChans for the different kinds of
events, but it is slightly better to use just one; the ordering on
events is retained, which makes things more predictable for the
client. So the design we have so far is a TVar to indicate whether
the client has been kicked and a TChan to carry both network input
and events from other clients.
Now that we have established the main architectural design, we can fill in the details. In the previous examples, we passed around the various pieces of state explicitly, but now that things are more complicated, it will help to separate the state into the global server state and the per-client state. The per-client state is defined as follows:
chat.hs
typeClientName=StringdataClient=Client{clientName::ClientName,clientHandle::Handle,clientKicked::TVar(MaybeString),clientSendChan::TChanMessage}
We have one TVar indicating whether this client has been
kicked (clientKicked). Normally, this TVar contains Nothing, but
after the client is kicked, the TVar contains Just s, where s
is a string describing the reason for the client being kicked.
The TChan clientSendChan carries all the other messages that may be
sent to a client. These have type Message:
dataMessage=NoticeString|TellClientNameString|BroadcastClientNameString|CommandString
Where, respectively: Notice is a message from the server,
Tell is a private message from another client, Broadcast is a
public message from another client, and Command is a line of text
received from the user (via the receive thread).
We need a way to construct a new instance of Client, which is
Straightforward:
newClient::ClientName->Handle->STMClientnewClientnamehandle=doc<-newTChank<-newTVarNothingreturnClient{clientName=name,clientHandle=handle,clientSendChan=c,clientKicked=k}
Next, we define a useful function for sending a Message to a given
Client:
sendMessage::Client->Message->STM()sendMessageClient{..}msg=writeTChanclientSendChanmsg
The syntax Client{..} is a record wildcard pattern, which brings
into scope all the fields of the Client record with their
declared names. In this case, we are using only clientSendChan, but
when there are lots of fields it is a convenient shorthand, so we will
be using it quite often from here on. (Remember to enable the
RecordWildCards extension to use this syntax.)
Note that this function is in the STM monad, not IO. We
will be using it inside some STM transactions later.
The data structure that stores the server state is just a TVar
containing a mapping from ClientName to Client.
dataServer=Server{clients::TVar(MapClientNameClient)}newServer::IOServernewServer=doc<-newTVarIOMap.emptyreturnServer{clients=c}
This state must be accessible from all the clients, because each client needs to be able to broadcast to all the others. Furthermore, new clients need to ensure that they are choosing a username that is not already in use and hence the set of active usernames is shared knowledge.
Here is how we broadcast a Message to all the clients:
broadcast::Server->Message->STM()broadcastServer{..}msg=doclientmap<-readTVarclientsmapM_(\client->sendMessageclientmsg)(Map.elemsclientmap)
Now we will work top-down and write the code of the server. The
main function is almost identical to the one in the previous section:
main::IO()main=withSocketsDo$doserver<-newServersock<-listenOn(PortNumber(fromIntegralport))printf"Listening on port %d\n"portforever$do(handle,host,port)<-acceptsockprintf"Accepted connection from %s: %s\n"host(showport)forkFinally(talkhandleserver)(\_->hClosehandle)port::Intport=44444
The only difference is that we create a new empty server
state up front by calling newServer and pass this to each new
client as an argument to talk.
When a new client connects, we need to do the following tasks:
Client and insert it into the Server
state, ensuring that the Client will be removed when it
disconnects or any failure occurs.
Let’s start by defining an auxiliary function checkAddClient, which
takes a username and attempts to add a new client with that name to
the state, returning Nothing if a client with that name already
exists, or Just client if the addition was successful. It also
broadcasts the event to all the other connected clients:
checkAddClient::Server->ClientName->Handle->IO(MaybeClient)checkAddClientserver@Server{..}namehandle=atomically$doclientmap<-readTVarclientsifMap.membernameclientmapthenreturnNothingelsedoclient<-newClientnamehandlewriteTVarclients$Map.insertnameclientclientmapbroadcastserver$Notice(name++" has connected")return(Justclient)
And we will need a corresponding removeClient that removes
the client again:
removeClient::Server->ClientName->IO()removeClientserver@Server{..}name=atomically$domodifyTVar'clients$Map.deletenamebroadcastserver$Notice(name++" has disconnected")
Now we can put the pieces together. Unfortunately we can’t reach for
the usual tool for these situations, namely bracket, because our
“resource acquisition” (checkAddClient) is conditional. So we
need to write the code out explicitly:
talk::Handle->Server->IO()talkhandleserver@Server{..}=dohSetNewlineModehandleuniversalNewlineMode-- Swallow carriage returns sent by telnet clientshSetBufferinghandleLineBufferingreadNamewherereadName=dohPutStrLnhandle"What is your name?"name<-hGetLinehandleifnullnamethenreadNameelsedook<-checkAddClientservernamehandle--![]()
caseokofNothing->do--![]()
hPrintfhandle"The name %s is in use, please choose another\n"namereadNameJustclient->--![]()
runClientserverclient`finally`removeClientservername
After reading the requested username from the client, we attempt
to add it to the server state with |
|
If we were unsuccessful, then print a message to the client, and
recursively call |
|
If we were successful, then call a function named |
This is almost right, but strictly speaking we should mask
asynchronous exceptions to eliminate the possibility that an exception
is received just after checkAddClient but before runClient, which
would leave a stale client in the state. This is what bracket would
have done for us, but because we’re rolling our own logic here, we have
to handle the exception safety, too (for reference, the definition of
bracket is given in “Asynchronous Exception Safety for Channels”).
The correct version of readName is as follows:
readName=dohPutStrLnhandle"What is your name?"name<-hGetLinehandleifnullnamethenreadNameelsemask$\restore->do--![]()
ok<-checkAddClientservernamehandlecaseokofNothing->restore$do--![]()
hPrintfhandle"The name %s is in use, please choose another\n"namereadNameJustclient->restore(runClientserverclient)--![]()
`finally`removeClientservername
We |
|
We restore them again before trying again if the name was already in use. |
|
If the name is accepted, then we unmask asynchronous
exceptions when calling |
Having initialized the client, created the Client data structure, and
added it to the Server state, we now need to create the client
threads themselves and start processing events. The main
functionality of the client will be implemented in a function called
runClient:
runClient::Server->Client->IO()
runClient returns or throws an exception only when the client
is to be disconnected. Recall that we need two threads per client: a
receive thread to read from the network socket and a server
thread to listen for messages from other clients and to send messages
back over the network. As before, we can use race to create the two
threads with a sibling relationship so that if either thread returns
or fails, the other will be cancelled.
runClient::Server->Client->IO()runClientserv@Server{..}client@Client{..}=doraceserverreceivereturn()wherereceive=forever$domsg<-hGetLineclientHandleatomically$sendMessageclient(Commandmsg)server=join$atomically$dok<-readTVarclientKickedcasekofJustreason->return$hPutStrLnclientHandle$"You have been kicked: "++reasonNothing->domsg<-readTChanclientSendChanreturn$docontinue<-handleMessageservclientmsgwhencontinue$server
So runClient is just race applied to the server and
receive threads. In the receive thread, we read one line at a time
from the client’s Handle and forward it to the server thread as a
Command message.
In the server thread, we have a transaction that tests two pieces of
state: first, the clientKicked TVar, to see whether this client
has been kicked. If it has not, then we take the next message from
clientSendChan and act upon it. Note that this time, we have
expressed server using join applied to the STM transaction: the
join function is from Control.Monad and has the following type:
join::Monadm=>m(ma)->ma
Here, m is instantiated to IO. The STM transaction
returns an IO action, which is run by join, and in most cases this
IO action returned will recursively invoke server.
The handleMessage function acts on a message and
is entirely straightforward:
handleMessage::Server->Client->Message->IOBoolhandleMessageserverclient@Client{..}message=casemessageofNoticemsg->output$"*** "++msgTellnamemsg->output$"*"++name++"*: "++msgBroadcastnamemsg->output$"<"++name++">: "++msgCommandmsg->casewordsmsgof["/kick",who]->doatomically$kickserverwhoclientNamereturnTrue"/tell":who:what->dotellserverclientwho(unwordswhat)returnTrue["/quit"]->returnFalse('/':_):_->dohPutStrLnclientHandle$"Unrecognized command: "++msgreturnTrue_->doatomically$broadcastserver$BroadcastclientNamemsgreturnTruewhereoutputs=dohPutStrLnclientHandles;returnTrue
Note that the function returns a Bool to indicate whether the caller
should continue to handle more messages (True) or exit (False).
We have now given most of the code for the chat server. The full code is less than 250 lines total, which is not at all bad considering that we have implemented a complete and usable chat server. Moreover, without changes the server will scale to many thousands of connections and can make use of multiple CPUs if they are available.
There were two tools that helped a lot here:
race
Care should be taken with STM with respect to performance, though. Take a look at the definition of broadcast in “Server Data”. It is an STM
transaction that operates on an unbounded number of TChans and thus
builds an unbounded transaction. We noted earlier in
“Performance” that long transactions should be avoided because
they cost O(n2). Hence, broadcast should be reimplemented to
avoid this. As an exercise, why not try to fix this yourself: one way to do it
would be to use a broadcast channel.
[43] It is provided by
Control.Concurrent in GHC 7.6.1 and later.
[44] nc is the netcat program, which is useful for simple network
interaction. You can also use telnet if nc is not available.
[45] In fact, this pattern is more
succinctly expressed using Control.Monad.join, but here it is
written without join for clarity.
[46] In real chat servers, this command would typically be available only to privileged users, but for simplicity here we will allow any user to kick any other user.