In this example, we describe the implementation of a simple web-application server.
There are two things we are concerned with: First, we would like to use scalable, multiplexed,
asynchronous IO (through the Java NIO API) with a syntax that resembles
that of blocking (readLine-style) IO. And second, we want to handle advanced
application control flow across individual HTTP requests.
What an HTTP server does is the following: For each incoming connection on a specified port, it reads requests and replies with responses until the connection is closed. We are going to use the following program code:
# # Main program # for socketChannel in acceptConnections(8080): { Trace.print("Connect: $socketChannel"); for req in readRequests(socketChannel): { res = handleRequest(req); writeResponse(socketChannel, res); } Trace.print("Disconnect: $socketChannel"); }
There are a number of things to be concerned with, though. Most importantly, no real HTTP server would implement this control loop in a sequential fashion, as there could be only one client connected at a time (or all connections would have to be closed after answering the first request). A number of real-world servers solve this problem by starting a new thread for each connection, but this leads to scalability problems if there are a lot of concurrently connected users.
The solution of choice is to limit the number of threads and to handle IO asynchronously, multiplexing state-change events from several connections. The Java NIO package provides an efficient implementation of such a callback-based IO library, but unfortunately, the API is far less easy to use than the customary, blocking, stream-interface.
Our goal is to use the above code as the server main loop and have it run fully asynchronous, multiplexed IO.
In the Java NIO API, there is a central Selector class,
whose instances take care of tracking updates on a number of SelectionKey
objects. We define a generator that repeatedly executes select on a
given selector and yields the set of selected keys:
new selections; def selections(selector): { new gen; def gen(yield): { count = selector->selectNow(); if count > 0: yield(selector->selectedKeys()); gen(yield); } return gen; }
In a second step, we define a function that creates a new selector object and, in parallel to returning the selector to the caller, starts a loop that iterates over the above selection-generator. For each set of selected keys, each key's attached callback handler is invoked and after that, the selection is cleared.
new createAsyncSelector; def createAsyncSelector(): { selector = SelectorProviderClass.provider()->openSelector(); { for keySet in selections(selector): { for k in keySet->iter(): { Trace.print("Select: $k"); handler = k->attachment(); handler(k); } keySet->clear(); } } & return selector; }
So far, we have automated the execution of callback handlers, but we have not introduced more fine-grained concurrency.
What we are going to do now is to wrap up each callback handler in a generator that
produces a value each time that specific callback is invoked. Implementing this
in a generalized manner allows us to introduce the concurrency we are striving
for: In parallel to the yield
call, the callback returns, allowing other callbacks to be handled concurrently
and new select invocations to occur.
In order to prevent the same event from being handled multiple times, the
corresponding selection key is deregistered from the selector until the
callback is fully handled. In effect, invocations of each callback handler are
serialized and executed in its own conceptual thread (which, of course,
has nothing to do with how the runtime might utilize platform threads).
new callbacks; def callbacks(channel, selector, ops): { new gen; def gen(yield): { new callback; def callback(key): { key->interestOps(0); { yield(key); if key->isValid(): { key->interestOps(ops); selector->wakeup(); } else: returnto.gen(); } & return(); } selectionKey = channel->register(selector, ops, callback); } return gen; }
With this callback generator at hand, we can implement the acceptConnections
generator. Note the fork yield statement, which, by allowing the
callback-generator to return, re-enables the flag that tells the server socket
to listen for additional connection attempts.
new acceptConnections; def acceptConnections(port): { new gen; def gen(yield): { serverSocketChannel = ServerSocketChannelClass.open(); serverSocketChannel->configureBlocking(false); isa = InetSocketAddress(port); serverSocketChannel->socket()->bind(isa); for key in serverSocketChannel->callbacks(selector, SelectionKeyClass.OP_ACCEPT): { serverSocketChannel = key->channel(); socketChannel = serverSocketChannel->accept(); socketChannel->configureBlocking(false); (fork yield) (socketChannel); } return(); } return gen; }
In the main-loop description above, there is a function readRequest
and a function writeResponse, as well as a function handleRequest.
Since we do not really care about the internals of the HTTP protocol, we skip a detailed
explanation of these methods. Instead, we take a closer look at a buffered reader
implementation which
provides a functionreadLine,
which is central to the parsing of HTTP requests and a good example of avoiding
the inversion of control usually incurred by asynchronous IO.
Just like the generator that yielded incoming connections, we can implement a generator that yields on newly received data:
new readBytes; def readBytes(socketChannel, bufSize): { new gen; def gen(yield): { buffer = ByteBufferClass.allocateDirect(bufSize); for key in socketChannel->callbacks(selector, SelectionKeyClass.OP_READ): { count = socketChannel->read(buffer); if count < 0: { socketChannel->close(); returnto.gen(); } buffer->flip(); yield(buffer); buffer->clear(); } return(); } return gen; }
With this generator, which is set up to repeatedly call the function
inputReceived, we can handle buffered reading, using
the following rendezvous scheme:
new BufferedDecoder; def BufferedDecoder(socketChannel): return BufferedDecoder(socketChannel, 4096); def BufferedDecoder(socketChannel, bufSize): { new bufferState; new inputReceived; def bufferState(charBuffer, waitingForInput) & inputReceived(byteBuffer): { # update charBuffer with data from byteBuffer, possibly # compacting/resizing # ... bufferState(charBuffer, false) & return(); } new this.readLine; def bufferState(charBuffer, false) & this.readLine(): { lineMatcher = PatternClass.compile("(.*)\r?\n")->matcher(charBuffer); if lineMatcher->find(): { line = lineMatcher->group(1); bufferState(charBuffer, false) & return(line); } else: bufferState(charBuffer, true) & return(this.readLine()); } bufferState(CharBufferClass.allocate(bufSize), true) & readBytes(socketChannel, bufSize)->foreach(inputReceived) & return this; }
Calls to readLine now behave in the well-known, synchronously
blocking way one would expect, despite the underlying asynchronous implementation.
A part of the request parsing function is shown below:
new readRequests; def readRequests(socketChannel): { reader = BufferedDecoder(socketChannel); new gen; def gen(yield): { # example first line: GET /foo/bar HTTP/1.1 requestLine = reader.readLine()->split(" ")->asList(); req.httpMethod = requestLine->get(0); req.httpLocation = requestLine->get(1); req.httpVersion = requestLine->get(2); # read headers and message body # ... yield(req); gen(yield); } return gen; }
The stateless request/response nature of the HTTP protocol is well-suited for the exchange of independent units of information, but coordinating a multi-request user interaction is a challenge. Not to mention that the user may at any time open a new browser window, or hit the Back or Reload-button.
Adopting the (rather non-standard) approach of some web frameworks such as Seaside, we can provide server-side cross-request scripting with a blocking send-page-and-wait metaphor. Each time a page is sent to the client, the server application logic is interrupted, saving a continuation that may be resumed upon receiving a future request.
We illustrate this approach by implementing a little guess-a-number game, asking the user to guess a previously chosen number and giving hints on whether to guess lower or higher. Once the user guesses correctly, the application reports the number of guesses needed and starts over with a fresh random number.




The code we will use is very similar to an implementation targeting a
text-based console with printLine and
readLine calls.
numberGen = Random(); new playGame; def playGame(s): { num = numberGen->nextInt(100); s = s->sendHtmlFormPage("Guess my number!", "It's between 0 and 100", ["Guess"]); new getAnswer; def getAnswer(s, n): { x = IntegerClass.parseInt(s.req.params->get("Guess")); if (x < num): { s = s->sendHtmlFormPage("Guess higher!", "$x is too low", ["Guess"]); returnto.getAnswer(getAnswer(s, n+1)); } if (x > num): { s = s->sendHtmlFormPage("Guess lower!", "$x is too high", ["Guess"]); returnto.getAnswer(getAnswer(s, n+1)); } if (x == num): { s = s->sendHtmlFormPage("Correct!", "$x is the answer (it took you $n tries to find out)", []); returnto.getAnswer(s); } } s = getAnswer(s, 1); return playGame(s); }
As we can easily see, the function sendHtmlFormPage plays a central part in the code above.
To understand the underlying mechanism, we first take a look at how the function handleRequest
(called from the main server loop) is implemented.
handlerMap = HashMap(); handlerMap->put("/", playGame); new handleRequest; def handleRequest(req): { handlerID = if req.params->containsKey("conversationID") then req.params->get("conversationID") else req.httpLocation; handler = if handlerMap->containsKey(handlerID) then handlerMap->get(handlerID) else errorHandler; session = req: req, resume: return; handler(session); }
The function handleRequest does never return by itself. Instead, the current
continuation (return) is saved in a session record, together with the incoming request.
Depending on the requested URL (req.httpLocation) or on a conversation id, if supplied,
a request handler is chosen which is then called with the session record as parameter. So in order
to have handleRequest return, which will trigger the delivery of actual page content
to the client by means of writeResponse, the selected handler must, at some time,
call session.resume with an appropriate response record.
This is exactly what sendHtmlFormPage does. In addition, it registers its own return
continuation as a fresh request handler, saving the auto-generated ID as conversation id in a
hidden form field. Upon submitting the form, it is the return continuation of
this call to sendHtmlFormPage which will be invoked by handleRequest, resuming
the interrupted program directly after the sendHtmlFormPage call.
new sendHtmlFormPage; def sendHtmlFormPage(session, head, body, fields): { targetID = registerHandler(return); inputFields = ("""<tr><td>${name}:</td><td><input type="text" name="${name}" /></td></tr>""" for name in fields)->reduce(strcat, ""); res.httpStatus = "200 OK"; res.body = """ <html> <head> <title>${head}</title> </head> <body> <h1>${head}</h1> <p>${body}</p> <form method="POST"> <input type="hidden" name="conversationID" value="${targetID}" /> <table border="0"> ${inputFields} </table> <input type="submit" name="submit" value="OK" /> </form> </body> </html> """; session.resume(res); }
With this continuation-based programming model, back-button usage is supported automatically, since previously saved continuations can be resumed at any time and possibly more than once. At times, though, this may not be desired. In this number game, for example, we can cheat with the number of guesses by hitting the back-button and resubmitting the form:



Invalidating continuations is up to the application, though, as it is mainly a matter of business-logic. Implementation-wise, we could create one-shot continuations, or we could employ the software-transaction model described in the Bankaccount tutorial.