Wednesday, September 16, 2009

The Anatomy of Hadoop I/O Pipeline


In a typical Hadoop MapReduce job, input files are read from HDFS.
Data are usually compressed to reduce the file sizes. After
decompression, serialized bytes are transformed into Java objects
before being passed to a user-defined map() function. Conversely,
output records are serialized, compressed, and eventually pushed back
to HDFS. This seemingly simple, two-way process is in fact much more
complicated due to a few reasons:

  • Compression and decompression are typically done through native
    library code.
  • End-to-end CRC32 checksum is always verified or calculated
    during reading or writing.
  • Buffer management is complicated due to various interface

In this blog post, I attempt to decompose and analyze the Hadoop I/O
pipeline in detail, and explore possible optimizations. To keep the
discussion concrete, I am going to use the ubiquitous example of
reading and writing line records from/to gzip-compressed text files. I
will not get into details on the DataNode side of the pipeline, and
instead mainly focus on the client-side (the map/reduce task
processes). Finally, all descriptions are based on Hadoop 0.21 trunk at
the time of this writing, which means you may see things differently if
you are using older or newer versions of Hadoop.

Reading Inputs

Figure 1 illustrates the I/O pipeline when reading line records from a
gzipped text file using TextInputFormat. The figure is divided in two
sides separated by a thin gap. The left side shows the DataNode
process, and the right side the application process (namely, the Map
task). From bottom to top, there are three zones where buffers are
allocated or manipulated: kernel space, native code space, and JVM
space. For the application process, from left to right, there are the
software layers that a data block needs to traverse through. Boxes with
different colors are buffers of various sorts. An arrow between two
boxes represents a data transfer or buffer-copy. The weight of an arrow
indicates the amount of data being transferred. The label in each box
shows the rough location of the buffer (either the variable that
references to the buffer, or the module where the buffer is allocated).
If available, the size of a buffer is described in square brackets. If
the buffer size is configurable, then both the configuration property
and the default size are shown. I tag each data transfer with the
numeric step where the transfer happens:

Figure 1: Reading line records from gzipped text files.

  1. Data transferred from DataNode to MapTask process. DBlk is the
    file data block; CBlk is the file checksum block. File data are
    transferred to the client through Java nio transferTo (aka UNIX
    sendfile syscall). Checksum data are first fetched to DataNode JVM
    buffer, and then pushed to the client (details are not shown). Both
    file data and checksum data are bundled in an HDFS packet
    (typically 64KB) in the format of: {packet header | checksum bytes
    | data bytes}.
  2. Data received from the socket are buffered in a
    BufferedInputStream, presumably for the purpose of reducing the
    number of syscalls to the kernel. This actually involves two
    buffer-copies: first, data are copied from kernel buffers into a
    temporary direct buffer in JDK code; second, data are copied from
    the temporary direct buffer to the byte[] buffer owned by the
    BufferedInputStream. The size of the byte[] in BufferedInputStream
    is controlled by configuration property 'io.file.buffer.size', and
    is default to 4K. In our production environment, this parameter is
    customized to 128K.
  3. Through the BufferedInputStream, the checksum bytes are saved
    into an internal ByteBuffer (whose size is roughly (PacketSize /
    512 * 4) or 512B), and file bytes (compressed data) are deposited
    into the byte[] buffer supplied by the decompression layer. Since
    the checksum calculation requires a full 512 byte chunk while a
    user's request may not be aligned with a chunk boundary, a 512B
    byte[] buffer is used to align the input before copying partial
    chunks into user-supplied byte[] buffer. Also note that data are
    copied to the buffer in 512-byte pieces (as required by
    FSInputChecker API). Finally, all checksum bytes are copied to a
    4-byte array for FSInputChecker to perform checksum verification.
    Overall, this step involves an extra buffer-copy.
  4. The decompression layer uses a byte[] buffer to receive data
    from the DFSClient layer. The DecompressorStream copies the data
    from the byte[] buffer to a 64K direct buffer, calls the native
    library code to decompress the data and stores the uncompressed
    bytes in another 64K direct buffer. This step involves two
  5. LineReader maintains an internal buffer to absorb data from the
    downstream. From the buffer, line separators are discovered and
    line bytes are copied to form Text objects. This step requires two

Optimizing Input Pipeline

Adding everything up, including a 'copy' for decompressing bytes,
the whole read pipeline involves seven buffer-copies to deliver a
record to MapTask's map() function since data are received in the
process's kernel buffer. There are a couple of things that could be
improved in the above process:

  • Many buffer-copies are needed simply to convert between direct
    buffer and byte[] buffer.
  • Checksum calculation can be done in bulk instead of one chunk
    oat a time.

Figure 2: Optimizing input pipeline.

Figure 2 shows the post-optimization view where the total number of
buffer copies is reduced from seven to three:

  1. An input packet is decomposed into the checksum part and the
    data part, which are scattered into two direct buffers: an internal
    one for checksum bytes, and the direct buffer owned by the
    decompression layer to hold compressed bytes. The FSInputChecker
    accesses both buffers directly.
  2. The decompression layer deflates the uncompressed bytes to a
    direct buffer owned by the LineReader.
  3. LineReader scans the bytes in the direct buffer, finds the line
    separators from the buffer, and constructs Text objects.

Writing Outputs

Now let's shift gears and look at the write-side of the story.
Figure 3 illustrates the I/O pipeline when a ReduceTask writes line
records into a gzipped text file using TextOutputFormat. Similar to
Figure 1, each data transfer is tagged with the numeric step where the
transfer occurs:

Figure 3: Writing line records into gzipped text files.

  1. TextOutputFormat's RecordWriter is unbuffered. When a user
    emits a line record, the bytes of the Text object are copied
    straight into a 64KB direct buffer owned by the compression layer.
    For a very long line, it will be copied to this buffer 64KB at a
    time for multiple times.
  2. Every time the compression layer receives a line (or part of a
    very long line), the native compression code is called, and
    compressed bytes are stored into another 64KB direct buffer. Data
    are then copied from that direct buffer to an internal byte[]
    buffer owned by the compression layer before pushing down to the
    DFSClient layer because the DFSClient layer only accepts byte[]
    buffer as input. The size of this buffer is again controlled by
    configuration property 'io.file.buffer.size'. This step involves
    two buffer-copies.
  3. FSOutputSummer calculates the CRC32 checksum from the byte[]
    buffer from the compression layer, and deposits both data bytes and
    checksum bytes into a byte[] buffer in a Packet object. Again,
    checksum calculation must be done on whole 512B chunks, and an
    internal 512B byte[] buffer is used to hold partial chunks that may
    result from compressed data not aligned with chunk boundaries.
    Checksums are first calculated and stored in a 4B byte[] buffer
    before being copied to the packet. This step involves one
  4. When a packet is full, the packet is pushed to a queue whose
    length is limited to 80. The size of the packet is controlled by
    configuration property 'dfs.write.packet.size' and is default to
    64KB. This step involves no buffer-copy.
  5. A DataStreamer thread waits on the queue and sends the packet
    to the socket whenever it receives one. The socket is wrapped with
    a BufferedOutputStream. But the byte[] buffer is very small (no
    more than 512B) and it is usually bypassed. The data, however,
    still needs to be copied to a temporary direct buffer owned by JDK
    code. This step requires two data copies.
  6. Data are sent from the ReduceTask's kernel buffer to the
    DataNode's kernel buffer. Before the data are stored in Block files
    and checksum files, there are a few buffer-copies in DataNode side.
    Unlike the case of DFS read, both file data and checksum data will
    traverse out of kernel, and into JVM land. The details of this
    process are beyond the discussion here and are not shown in the

Optimizing Output Pipeline

Overall, including the 'copy' for compressing bytes, the process

described above requires six buffer-copies for an output line record to
reach ReduceTask's kernel buffer. What could we do to optimize the
write pipeline?

  • We can probably reduce a few buffer-copies.
  • The native compression code may be called less frequently if we
    call it only after the input buffer is full (block compression
    codecs like LZO already do this).
  • Checksum calculations can be done in bulk instead of one chunk
    at a time.

Figure 4: Optimizing output pipeline.

Figure 4 shows how it looks like after these optimizations, where a
total of four buffer-copies are necessary:

  1. Bytes from a user's Text object are copied to a direct buffer
    owned by the TextOutputFormat layer.
  2. Once this buffer is full, native compression code is called and
    compressed data is deposited to a direct buffer owned by the
    compression layer.
  3. FSOutputSummer computes the checksum for bytes in the direct
    buffer from the compression layer and saves both data bytes and
    checksum bytes into a packet's direct buffer.
  4. A full packet will be pushed into a queue, and, in background,
    the DataStreamer thread sends the packet through the socket, which
    copies the bytes to be copied to kernel buffers.


This blog post came out of an afternoon spent asking ourselves
specific questions about Hadoop's I/O and validating the answers in the
code. It turns out, after combing through class after class, that the
pipeline is more complex than we originally thought. While each of us
is familiar with one or more components, we found the preceding,
comprehensive picture of Hadoop I/O elucidating, and we hope other
developers and users will, too. Effecting the optimizations outlined
above will be a daunting task, and this is the first step toward a more
performant Hadoop.

No comments:

Post a Comment