Tuesday, September 22, 2009

Memory Mapped Buffers and Non-blocking IO in Java

The new I/O (input/output) packages finally address Java's long-standing shortcomings in its high-performance, scalable I/O. The new I/O packages -- java.nio.* -- allow Java applications to handle thousands of open connections while delivering scalability and excellent performance. These packages introduce four key abstractions that work together to solve the problems of traditional Java I/O:
  1. A Buffer contains data in a linear sequence for reading or writing. A special buffer provides for memory-mapped file I/O.
  2. A charset maps Unicode character strings to and from byte sequences. (Yes, this is Java's third shot at character conversion.)
  3. Channels -- which can be sockets, files, or pipes -- represent a bidirectional communication pipe.
  4. Selectors multiplex asynchronous I/O operations into one or more threads.

A quick review

Before diving into the new API's gory details, let's review Java I/O's old style. Imagine a basic network daemon. It needs to listen to a ServerSocket, accept incoming connections, and service each connection. Assume for this example that servicing a connection involves reading a request and sending a response. That resembles the way a Web server works. Figure 1 depicts the server's lifecycle. At each heavy black line, the I/O operation blocks -- that is, the operation call won't return until the operation completes.

Figure 1. Blocking points in a typical Java server
Let's take a closer look at each step.
Creating a ServerSocket is easy:
ServerSocket server = new ServerSocket(8001);


Accepting new connections is just as easy, but with a hidden catch:
Socket newConnection = server.accept();


The call to server.accept() blocks until the ServerSocket accepts an incoming network connection. That leaves the calling thread sitting for an indeterminate length of time. If this application has only one thread, it does a great impression of a system hang.
Once the incoming connection has been accepted, the server can read a request from that socket, as shown in the code below. Don't worry about the Request object. It is a fiction invented to keep this example simple.
InputStream in = newConnection.getInputStream();
InputStreamReader reader = new InputStreamReader(in);
LineNumberReader lnr = new LineNumberReader(reader);
Request request = new Request();
while(!request.isComplete()) {
  String line = lnr.readLine();
  request.addLine(line);
}
This harmless-looking chunk of code features problems. Let's start with blocking. The call to lnr.readLine() eventually filters down to call SocketInputStream.read(). There, if data waits in the network buffer, the call immediately returns some data to the caller. If there isn't enough data buffered, then the call to read blocks until enough data is received or the other computer closes the socket. Because LineNumberReader asks for data in chunks (it extends BufferedReader), it might just sit around waiting to fill a buffer, even though the request is actually complete. The tail end of the request can sit in a buffer that LineNumberReader has not returned.

This code fragment also creates too much garbage, another big problem. LineNumberReader creates a buffer to hold the data it reads from the socket, but it also creates Strings to hold the same data. In fact, internally, it creates a StringBuffer. LineNumberReader reuses its own buffer, which helps a little. Nevertheless, all the Strings quickly become garbage.
Now it's time to send the response. It might look something like this (imagine that the Response object creates its stream by locating and opening a file):
Response response = request.generateResponse();
OutputStream out = newConnection.getOutputStream();
InputStream in = response.getInputStream();
int ch;
while(-1 != (ch = in.read())) {
  out.write(ch);
}
newConnection.close();
This code suffers from only two problems. Again, the read and write calls block. Writing one character at a time to a socket slows the process, so the stream should be buffered. Of course, if the stream were buffered, then the buffers would create more garbage.

You can see that even this simple example features two problems that won't go away: blocking and garbage.

The old way to break through blocks

The usual approach to dealing with blocking I/O in Java involves threads -- lots and lots of threads. You can simply create a pool of threads waiting to process requests, as shown in Figure 2.

Figure 2. Worker threads to handle requests
Threads allow a server to handle multiple connections, but they still cause trouble. First, threads are not cheap. Each has its own stack and receives some CPU allocation. As a practical matter, a JVM might create dozens or even a few hundred threads, but it should never create thousands of them.
In a deeper sense, you don't need all those threads. They do not efficiently use the CPU. In a request-response server, each thread spends most of its time blocked on some I/O operation. These lazy threads offer an expensive approach to keeping track of each request's state in a state machine. The best solution would multiplex connections and threads so a thread could order some I/O work and go on to something productive, instead of just waiting for the I/O work to complete.

New I/O, new abstractions

Now that we've reviewed the classic approach to Java I/O, let's look at how the new I/O abstractions work together to solve the problems we've seen with the traditional approach.
Along with each of the following sections, I refer to sample code (available in Resources) for an HTTP server that uses all these abstractions. Each section builds on the previous sections, so the final structure might not be obvious from just the buffer discussion.

Buffered to be easier on your stomach

Truly high-performance server applications must obsess about garbage collection. The unattainable ideal server application would handle a request and response without creating any garbage. The more garbage the server creates, the more often it must collect garbage. The more often it collects garbage, the lower its throughput.

Of course, it's impossible to avoid creating garbage altogether; you need to just manage it the best way you know how. That's where buffers come in. Traditional Java I/O wastes objects all over the place (mostly Strings). The new I/O avoids this waste by using Buffers to read and write data. A Buffer is a linear, sequential dataset and holds only one data type according to its class:
java.nio.Buffer Abstract base class
java.nio.ByteBuffer Holds bytes. Can be direct or nondirect. Can be read from a ReadableByteChannel. Can be written to a WritableByteChannel.
java.nio.MappedByteBuffer Holds bytes. Always direct. Contents are a memory-mapped region of a file.
java.nio.CharBuffer Holds chars. Cannot be written to a Channel.
java.nio.DoubleBuffer Holds doubles. Cannot be written to a Channel.
java.nio.FloatBuffer Holds floats. Can be direct or nondirect.
java.nio.IntBuffer Holds ints. Can be direct or nondirect.
java.nio.LongBuffer Holds longs. Can be direct or nondirect.
java.nio.ShortBuffer Holds shorts. Can be direct or nondirect.


Table 1. Buffer classes
You allocate a buffer by calling either allocate(int capacity) or allocateDirect(int capacity) on a concrete subclass. As a special case, you can create a MappedByteBuffer by calling FileChannel.map(int mode, long position, int size).
A direct buffer allocates a contiguous memory block and uses native access methods to read and write its data. When you can arrange it, a direct buffer is the way to go. Nondirect buffers access their data through Java array accessors. Sometimes you must use a nondirect buffer -- when using any of the wrap methods (like ByteBuffer.wrap(byte[])) -- to construct a Buffer on top of a Java array, for example.
When you allocate the Buffer, you fix its capacity; you can't resize these containers. Capacity refers to the number of primitive elements the Buffer can contain. Although you can put multibyte data types (short, int, float, long, and so on) into a ByteBuffer, its capacity is still measured in bytes. The ByteBuffer converts larger data types into byte sequences when you put them into the buffer. (See the next section for a discussion about byte ordering.) Figure 3 shows a brand new ByteBuffer created by the code below. The buffer features a capacity of eight bytes.
ByteBuffer example = ByteBuffer.allocateDirect(8);

Figure 3. A fresh ByteBuffer


The Buffer's position is the index of the next element that will be written or read. As you can see in Figure 3, position starts at zero for a newly allocated Buffer. As you put data into the Buffer, position climbs toward the limit. Figure 4 shows the same buffer after the calls in the next code fragment add some data.
example.put( (byte)0xca );
example.putShort( (short)0xfeba );
example.put( (byte)0xbe );

Figure 4. ByteBuffer after a few puts


Another of the buffer's important attributes is its limit. The limit is the first element that should not be read or written. Attempting to put() past the limit causes a BufferOverflowException. Similarly, attempting to get() past the limit causes a BufferUnderflowException. For a new buffer, the limit equals the capacity. There is a trick to using buffers. Between filling the buffer with data and writing it on a Channel, the buffer must flip. Flipping a buffer primes it for a new sequence of operations. If you've been putting data into a buffer, flipping it ensures that it's ready to read the data. More precisely, flipping the buffer sets its limit to the current position and then resets its position to zero. Its capacity does not change. The following code flips the buffer. Figure 5 depicts the effect of flipping the sample buffer.
example.flip();



Figure 5. The flipped ByteBuffer
After the flip, the buffer can be read. In this example, get() returns four bytes before it throws a BufferUnderflowException.

An aside about byte ordering

Any data type larger than a byte must be stored in multiple bytes. A short (16 bits) requires two bytes, while an int (32 bits) requires four bytes. For a variety of historical reasons, different CPU architectures pack these bytes differently. On big-endian architectures, the most significant byte goes in the lowest address, as shown in Figure 6. Big-endian order is often referred to as network order.

Figure 6. Big-endian byte ordering
Little-endian architectures put the least significant byte first, as in Figure 7.

Figure 7. Little-endian byte ordering


Anyone who programs networks in C or C++ can rant at length about byte-ordering problems. Host byte order, network byte order, big endian, little endian ... they're a pain. If you put a short into a byte array in big-endian ordering and remove it in little-endian ordering, you receive a different number than you put in! (See Figure 8.)

Figure 8. The result of mismatched byte ordering
You might have noticed that the call to example.putShort() illustrated in Figure 4 resulted in 0xFE at Position 1 and 0xBA at Position 2. In other words, the most significant byte went into the lowest numbered slot. Therefore, Figure 4 offers an example of big-endian byte ordering. java.nio.ByteBuffer defaults to big-endian byte ordering on all machines, no matter what the underlying CPU might use. (In fact, Intel microprocessors are little endian.) ByteBuffer uses instances of java.nio.ByteOrder to determine its byte ordering. The static constants ByteOrder.BIG_ENDIAN and ByteOrder.LITTLE_ENDIAN do exactly what you would expect.
Essentially, if you talk to another Java program, leave the byte ordering alone and it will work. If you talk to a well-behaved socket application in any language, you should also leave the byte ordering alone. You fiddle with byte ordering in only two instances: when you talk to a poorly-behaved network application that does not respect network byte ordering, or when you deal with binary data files created on a little-endian machine.

How do buffers help?



So how can buffers improve performance and cut down on garbage? You could create a pool of direct Buffers to avoid allocations during request processing. Or you could create Buffers for common situations and keep them around. The following fragment from our sample HTTP server illustrates the latter approach:
class ReadWriteThread extends Thread {
  ...
  private WeakHashMap fileCache = new WeakHashMap();
  private ByteBuffer[] responseBuffers = new ByteBuffer[2];
  ...
  public ReadWriteThread(Selector readSelector, 
                         ConnectionList acceptedConnections, 
                         File dir) 
      throws Exception 
  {
    super("Reader-Writer");
    ...
    responseBuffers[0] = initializeResponseHeader();
    ...
  }
  ...
  protected ByteBuffer initializeResponseHeader() throws Exception {
    // Pre-load a "good" HTTP response as characters.
    CharBuffer chars = CharBuffer.allocate(88);
    chars.put("HTTP/1.1 200 OK\n");
    chars.put("Connection: close\n");
    chars.put("Server: Java New I/O Example\n");
    chars.put("Content-Type: text/html\n");
    chars.put("\n");
    chars.flip();
    // Translate the Unicode characters into ASCII bytes.
    ByteBuffer buffer = ascii.newEncoder().encode(chars);
    ByteBuffer directBuffer = ByteBuffer.allocateDirect(buffer.limit());
    directBuffer.put(buffer);
    return directBuffer;
  }
  ...
}
The above code is an excerpt from the thread that reads requests and sends responses. In the constructor, we set up two ByteBuffers for the responses. The first buffer always contains the HTTP response header. This particular server always sends the same headers and the same response code. To send error responses, the method sendError() (not shown above) creates a similar buffer with an HTTP error response for a particular status code. It saves the error response headers in a WeakHashMap, keyed by the HTTP status code.

The initializeResponseHeader() method actually uses three buffers. It fills a CharBuffer with Strings. The character set encoder turns the Unicode strings into bytes. I will cover character conversion later. Since this header is sent at the beginning of every response from the server, it saves time to create the response once, save it in a buffer, and just send the buffer every time. Notice the call to flip the CharBuffer after we put our data into it. The third buffer used in initializeResponseHeader() seems a bit odd. Why convert the characters into a ByteBuffer just to then copy them into another ByteBuffer? The answer: because CharsetEncoder creates a nondirect ByteBuffer. When you write a direct buffer to a channel, it immediately passes to native calls. However, when you pass a nondirect buffer to a channel, the channel provider creates a new, direct buffer and copies the nondirect buffer's contents. That means extra garbage and a data copy. It worsens when the buffer with the response header is sent in every HTTP response. Why let the channel provider create a direct buffer on every request if we can do it once and get it over with?

Character encoding

When putting data into ByteBuffers, two related problems crop up: byte ordering and character conversion. ByteBuffer handles byte ordering internally using the ByteOrder class. It does not deal with character conversion, however. In fact, ByteBuffer doesn't even have methods for reading or writing strings. Character conversion is a complicated topic, subject to many international standards, including the Internet Engineering Task Force's requests for comments, the Unicode Standard, and the Internet Assigned Numbers Authority (IANA). However, almost every time you deal with character conversion, you must convert Unicode strings to either ASCII or UTF-8. Fortunately, these are easy cases to handle. ASCII and UTF-8 are examples of character sets. A character set defines a mapping from Unicode to bytes and back again. Character sets are named according to IANA standards. In Java, a character set is represented by an instance of java.nio.charset.Charset. As with most internationalization classes, you do not construct Charsets directly. Instead, you use the static factory method Charset.forName() to acquire an appropriate instance. Charset.availableCharsets() gives you a map of supported character set names and their Charset instances. The J2SE 1.4 beta includes eight character sets: US-ASCII, ISO-8859-1, ISO-8859-15, UTF-8, UTF-16, UTF-16BE (big endian), UTF-16LE (little endian), and Windows-1252.

Charset constructs CharsetEncoders and CharsetDecoders to convert character sequences into bytes and back again. Take another look at ReadWriteThread below. The encoder shows up twice for converting an entire CharBuffer into a ByteBuffer. readRequest, on the other hand, uses the decoder on the incoming request.
class ReadWriteThread extends Thread {
  ...
  private Charset ascii;
  ...
  public ReadWriteThread(Selector readSelector, 
                         ConnectionList acceptedConnections, 
                         File dir) 
      throws Exception 
  {
    super("Reader-Writer");
    ...
    ascii = Charset.forName("US-ASCII");
    responseBuffers[0] = initializeResponseHeader();
    ...
  }
  ...
  protected ByteBuffer initializeResponseHeader() throws Exception {
    ...
    // Translate the Unicode characters into ASCII bytes.
    ByteBuffer buffer = ascii.newEncoder().encode(chars);
    ...
  }
  ...
  protected String readRequest(SelectionKey key) throws Exception {
    SocketChannel incomingChannel = (SocketChannel)key.channel();
    Socket incomingSocket = incomingChannel.socket();
    ...
    int bytesRead = incomingChannel.read(readBuffer);
    readBuffer.flip();
    String result = asciiDecoder.decode(readBuffer).toString();
    readBuffer.clear();
    StringBuffer requestString = (StringBuffer)key.attachment();
    requestString.append(result);
    ...
  }
  ...
  protected void sendError(SocketChannel channel, 
                           RequestException error) throws Exception {
      ...
      // Translate the Unicode characters into ASCII bytes.
      buffer = ascii.newEncoder().encode(chars);
      errorBufferCache.put(error, buffer);
      ...
  }
}

Channel the new way



You might notice that none of the existing java.io classes know how to read or write Buffers. In Merlin, Channels read data into Buffers and send data from Buffers. Channels join Streams and Readers as a key I/O construct. A channel might be thought of as a connection to some device, program, or network. At the top level, the java.nio.channels.Channel interface just knows whether it is open or closed. A nifty feature of Channel is that one thread can be blocked on an operation, and another thread can close the channel. When the channel closes, the blocked thread awakens with an exception indicating that the channel closed. There are several Channel classes, as shown in Figure 9.

Figure 9. Channel interface hierarchy
Additional interfaces depicted in Figure 9 add methods for reading (java.nio.channels.ReadableByteChannel), writing (java.nio.channels.WritableByteChannel), and scatter/gather operations. A gathering write can write data from several buffers to the channel in one contiguous operation. Conversely, a scattering read can read data from the channel and deposit it into several buffers, filling each one in turn to its limit. Scatter/gather operations have been used for years in high-performance I/O managers in Unix and Windows NT. SCSI controllers also employ scatter/gather to improve overall performance. In Java, the channels quickly pass scatter/gather operations down to the native operating system functions for vectored I/O. Scatter/gather operations also ease protocol or file handling, particularly when you create fixed headers in some buffers and change only one or two variable data buffers. You can configure channels for blocking or nonblocking operations. When blocking, calls to read, write, or other operations do not return until the operation completes. Large writes over a slow socket can take a long time. In nonblocking mode, a call to write a large buffer over a slow socket would just queue up the data (probably in an operating system buffer, though it could even queue it up in a buffer on the network card) and return immediately. The thread can move on to other tasks while the operating system's I/O manager finishes the job. Similarly, the operating system always buffers incoming data until the application asks for it. When blocking, if the application asks for more data than the operating system has received, the call blocks until more data comes in. In nonblocking mode, the application just gets whatever data is immediately available. The sample code included with this article uses each of the following three channels at various times:
  • ServerSocketChannel
  • SocketChannel
  • FileChannel


ServerSocketChannel

java.nio.channels.ServerSocketChannel plays the same role as java.net.ServerSocket. It creates a listening socket that accepts incoming connections. It cannot read or write. ServerSocketChannel.socket() provides access to the underlying ServerSocket, so you can still set socket options that way. As is the case with all the specific channels, you do not construct ServerSocketChannel instances directly. Instead, use the ServerSocketChannel.open() factory method.
ServerSocketChannel.accept() returns a java.nio.channel.SocketChannel for a newly connected client. (Note: Before Beta 3, accept() returned a java.net.Socket. Now the method returns a SocketChannel, which is less confusing for developers.) If the ServerSocketChannel is in blocking mode, accept() won't return until a connection request arrives. (There is an exception: you can set a socket timeout on the ServerSocket. In that case, accept() eventually throws a TimeoutException.) If the ServerSocketChannel is in nonblocking mode, accept() always returns immediately with either a Socket or null. In the sample code, AcceptThread constructs a ServerSocketChannel called ssc and binds it to a local TCP port:
class AcceptThread extends Thread {
  private ServerSocketChannel ssc;
  public AcceptThread(Selector connectSelector, 
                      ConnectionList list, 
                      int port) 
      throws Exception 
  {
    super("Acceptor");
    ...
    ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    InetSocketAddress address = new InetSocketAddress(port);
    ssc.socket().bind(address);
    ...
  }

SocketChannel

java.nio.channels.SocketChannel is the real workhorse in this application. It encapsulates a java.net.Socket and adds a nonblocking mode and a state machine.

SocketChannels can be created one of two ways. First, SocketChannel.open() creates a new, unconnected SocketChannel. Second, the Socket returned by ServerSocketChannel.accept() actually has an open and connected SocketChannel attached to it. This code fragment, from AcceptThread, illustrates the second approach to acquiring a SocketChannel:
class AcceptThread extends Thread {
  private ConnectionList acceptedConnections;
  ...
  protected void acceptPendingConnections() throws Exception {
    ...
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      ...
      ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
      SocketChannel incomingChannel = readyChannel.accept();
      acceptedConnections.push(incomingChannel);
    }
  }
}
Like SelectableChannel's other subclasses, SocketChannel can be blocking or nonblocking. If it is blocking, then read and write operations on the SocketChannel behave exactly like blocking reads and writes on a Socket, with one vital exception: these blocking reads and writes can be interrupted if another thread closes the channel.

FileChannel

Unlike SocketChannel and ServerSocketChannel, java.nio.channels.FileChannel does not derive from SelectableChannel. As you will see in the next section, that means that FileChannels cannot be used for nonblocking I/O. Nevertheless, FileChannel has a slew of sophisticated features that were previously reserved for C programmers. FileChannels allow locking of file portions and direct file-to-file transfers that use the operating system's file cache. FileChannel can also map file regions into memory. Memory mapping a file uses the native operating system's memory manager to make a file's contents look like memory locations. For more efficient mapping, the operating system uses its disk paging system. From the application's perspective, the file contents just exist in memory at some range of addresses. When it maps a file region into memory, FileChannel creates a MappedByteBuffer to represent that memory region. MappedByteBuffer is a type of direct byte buffer. A MappedByteBuffer offers two big advantages. First, reading memory-mapped files is fast. The biggest gains go to sequential access, but random access also speeds up. The operating system can page the file into memory far better than java.io.BufferedInputStream can do its block reads. The second advantage is that using MappedByteBuffers to send files is simple, as shown in the next code fragment, also from ReadWriteThread:
protected void sendFile(String uri, SocketChannel channel) throws 
RequestException, IOException {
    if(Server.verbose) 
      System.out.println("ReadWriteThread: Sending " + uri);
    Object obj = fileCache.get(uri);
    
    if(obj == null) {
      Server.statistics.fileMiss();
      try {
            File f = new File(baseDirectory, uri);
            FileInputStream fis = new FileInputStream(f);
            FileChannel fc = fis.getChannel();
            
            int fileSize = (int)fc.size();
            responseBuffers[1] = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
            fileCache.put(uri, responseBuffers[1]);
      } catch(FileNotFoundException fnfe) {
            throw RequestException.PAGE_NOT_FOUND;
      }
    } else {
      Server.statistics.fileHit();
      responseBuffers[1] = (MappedByteBuffer)obj;
      responseBuffers[1].rewind();
    }
    responseBuffers[0].rewind();
    channel.write(responseBuffers);
  }
The sendFile() method sends a file as an HTTP response. The lines inside the try block create the MappedByteBuffer. The rest of the method caches the memory-mapped file buffers in a WeakHashMap. That way, repeated requests for the same file are blindingly fast, yet when memory tightens, the garbage collector eliminates the cached files. You could keep the buffers in a normal HashMap, but only if you know that the file number is small (and fixed). Notice that the call to channel.write() actually passes an array of two ByteBuffers (one direct, one mapped). Passing two buffers makes the call a gathering write operation. The first buffer is fixed to contain the HTTP response code, headers, and body separator. The second buffer is the memory-mapped file. The channel sends the entire contents of the first buffer (the response header) followed by the entire contents of the second buffer (the file data).

The bridge to the old world



Before moving on to nonblocking operations, you should investigate the class java.nio.channels.Channels. Channels allows new I/O channels to interoperate with old I/O streams and readers. Channels has static methods that can create a channel from a stream or reader or vice versa. It proves most useful when you deal with third-party packages that expect streams, such as XML parsers.

Selectors

In the old days of blocking I/O, you always knew when you could read or write to a stream, because your call would not return until the stream was ready. Now, with nonblocking channels, you need some other way to tell when a channel is ready. In the new I/O packages, Selectors serve that purpose.

In Pattern Oriented Software Architecture, Volume 2, by Douglas Schmidt, Michael Stal, Hans Rohnert, and Frank Buschmann (John Wiley & Son Ltd, 1996), the authors present a pattern called Reactor. Reactor allows applications to decouple event arrival from event handling. Events arrive at arbitrary times but are not immediately dispatched. Instead, a Reactor keeps track of the events until the handlers ask for them.
A java.nio.channels.Selector plays the role of a Reactor. A Selector multiplexes events on SelectableChannels. In other words, a Selector provides a rendezvous point between I/O events on channels and client applications. Each SelectableChannel can register interest in certain events. Instead of notifying the application when the events happen, the channels track the events. Later, when the application calls one of the selection methods on the Selector, it polls the registered channels to see if any interesting events have occurred. Figure 10 depicts an example of a selector with two registered channels.

Figure 10. A selector polling its channels
Channels only register for operations they have interest in. Not every channel supports every operation. SelectionKey defines all possible operation bits, which are used twice. First, when the application registers the channel by calling SelectableChannel.register(Selector sel, int operations), it passes the sum of the desired operations as the second argument. Then, once a SelectionKey has been selected, the SelectionKey's readyOps() method returns the sum of all the operation bits that its channel is ready to perform. SelectableChannel.validOps() returns the allowed operations for each channel. Attempting to register a channel for operations it doesn't support results in an IllegalArgumentException. The following table lists the valid operations for each concrete subclass of SelectableChannel:
ServerSocketChannel OP_ACCEPT
SocketChannel OP_CONNECT, OP_READ, OP_WRITE
DatagramChannel OP_READ, OP_WRITE
Pipe.SourceChannel OP_READ
Pipe.SinkChannel OP_WRITE


Table 2. SelectableChannels and their valid operations


A channel can register for different operation sets on different selectors. When the operating system indicates that a channel can perform one of the valid operations that it registered for, the channel is ready. On each selection call, a selector undergoes a series of actions. First, every key cancelled since the last selection drops from the selector's key set. A key can be cancelled by explicitly calling SelectionKey.cancel(), by closing the key's channel, or by closing the key's selector. Keys can be cancelled asynchronously -- even while the selector is blocking. Second, the selector checks each channel to see if it's ready. If it is, then the selector adds the channel's key to the ready set. When a key is in the ready set, the key's readyOps() method always returns a set of operations that the key's channel can perform. If the key was already in the ready set before this call to select(), then the new operations are added to the key's readyOps(), so that the key reflects all the available operations.
Next, if any keys have cancelled while the operating system checks are underway, they drop from the ready set and the registered key set.
Finally, the selector returns the number of keys in its ready set. The set itself can be obtained with the selectedKeys() method. If you call Selector.selectNow() and no channels are ready, then selectNow() just returns zero. On the other hand, if you call Selector.select() or Selector.select(int timeout), then the selector blocks until at least one channel is ready or the timeout is reached. Selectors should be familiar to Unix or Win32 system programmers, who will recognize them as object-oriented versions of select() or WaitForSingleEvent(). Before Merlin, asynchronous I/O was the domain of C or C++ programmers; now it is available to Java programmers too. See the sidebar, "Is the New I/O Too Platform-Specific?", for a discussion of why Java is just now acquiring asynchronous I/O.

Applying selectors



The sample application uses two selectors. In AcceptThread, the first selector just handles the ServerSocketChannel:
class AcceptThread extends Thread {
  private ServerSocketChannel ssc;
  private Selector connectSelector;
  private ConnectionList acceptedConnections;
  public AcceptThread(Selector connectSelector, 
                      ConnectionList list, 
                      int port) 
      throws Exception 
  {
    super("Acceptor");
    this.connectSelector = connectSelector;
    this.acceptedConnections = list;
    ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    InetSocketAddress address = new InetSocketAddress(port);
    ssc.socket().bind(address);
    ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT);
  }
   
  public void run() {
    while(true) {
      try {
        connectSelector.select();
        acceptPendingConnections();
      } catch(Exception ex) {
        ex.printStackTrace();
      }
    }
  }
  protected void acceptPendingConnections() throws Exception {
    Set readyKeys = connectSelector.selectedKeys();
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      SelectionKey key = (SelectionKey)i.next();
      i.remove();
      ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
      SocketChannel incomingChannel = readyChannel.accept();
      acceptedConnections.push(incomingChannel);
    }
  }
}
AcceptThread uses connectSelector to detect incoming connection attempts. Whenever the selector indicates that the ServerSocketChannel is ready, there must be a connection attempt. AcceptThread.acceptPendingConnections() iterates through the selected keys (there can be only one) and removes it from the set. Thanks to the selector, we know that the call to ServerSocketChannel.accept() returns immediately. We can get a SocketChannel -- representing a client connection -- from the new Socket. That new channel passes to ReadWriteThread, by way of a FIFO (first in, first out) queue.

ReadWriteThread uses readSelector to find out when a request has been received. Because it is only a sample, our server application assumes that all requests arrive in a single TCP packet. That is not a good assumption for a real Web server. Other code samples have already shown ReadWriteThread's buffer management, file mapping, and response sending, so this listing contains only the selection code:
class ReadWriteThread extends Thread {
  private Selector readSelector;
  private ConnectionList acceptedConnections;
  ...
  public void run() {
    while(true) {
      try {
        registerNewChannels();
        int keysReady = readSelector.select();
        if(keysReady > 0) {
          acceptPendingRequests();
        }
      } catch(Exception ex) {
        ex.printStackTrace();
      }
    }
  }
  protected void registerNewChannels() throws Exception {
    SocketChannel channel;
    while(null != (channel = acceptedConnections.removeFirst())) {
      channel.configureBlocking(false);
      channel.register(readSelector, SelectionKey.OP_READ, new StringBuffer());
    }  
  }
  protected void acceptPendingRequests() throws Exception {
    Set readyKeys = readSelector.selectedKeys();
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      SelectionKey key = (SelectionKey)i.next();
      i.remove();
      SocketChannel incomingChannel = (SocketChannel)key.channel();
      Socket incomingSocket = incomingChannel.socket();
      ...
            String path = readRequest(incomingSocket);
            sendFile(path, incomingChannel);
      ...
    }
  }


The main loops of each thread resemble each other, with the main difference being that ReadWriteThread registers for OP_READ, while AcceptThread registers for OP_ACCEPT. Naturally, the ways in which the respective events are handled differ; overall, however, both threads are instances of the Reactor pattern.
The third argument of register() is an attachment to the SelectionKey, which can be any object. The key holds on to the attachment for later use. Here, the attachment is a StringBuffer that readRequest uses to receive the incoming HTTP request. Each time readRequest reads a Buffer from the socket, it decodes the buffer and appends it to the request string. Once the request string is finished, readRequest calls handleCompleteRequest to parse the request and send the response.

A few gotchas

You might encounter a few tricky points when using selectors. First, although selectors are thread safe, their key sets are not. When Selector.selectedKeys() is called, it actually returns the Set that the selector uses internally. That means that any other selection operations (perhaps called by other threads) can change the Set. Second, once the selector puts a key in the selected set, it stays there. The only time a selector removes keys from the selected set is when the key's channel closes. Otherwise, the key stays in the selected set until it is explicitly removed. Third, registering a new channel with a selector that is already blocking does not wake up the selector. Although the selector appears as if it misses events, it will detect the events with the next call to select(). Fourth, a selector can have only 63 channels registered, which is probably not a big deal.

No comments:

Post a Comment