添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 2. Architectural Overview
    • 1. Rich Buffer Data Structure
      • 1.1. Combining and Slicing ChannelBuffers
      • 2. Universal Asynchronous I/O API
      • 3. Event Model based on the Interceptor Chain Pattern
      • 4. Advanced Components for More Rapid Development
        • 4.1. Codec framework
        • 4.2. SSL / TLS Support
        • 4.3. HTTP Implementation
        • 4.4. WebSockets Implementation
        • 4.5. Google Protocol Buffer Integration
        • 5. Summary
        • 2. Frequently Asked Questions
          • 1. When can I write downstream data?
          • 2. How do I incorporate my blocking application code with the non-blocking NioServerSocketChannelFactory?
          • 3. Do I need to synchronize my handler code given that events can happen at the same time?
          • 4. How do I pass data between handlers in the same Channel?
          • Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation which is dedicated to a special purpose. For example, you might want to implement an HTTP server which is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol which is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application. The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance · high-scalability protocol servers and clients. In other words, Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development. 'Quick and easy' does not mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise. Some users might already have found other network application framework that claims to have the same advantage, and you might want to ask what makes Netty so different from them. The answer is the philosophy where it is built on. Netty is designed to give you the most comfortable experience both in terms of the API and the implementation from the day one. It is not something tangible but you will realize that this philosophy will make your life much easier as you read this guide and play with Netty. This chapter tours around the core constructs of Netty with simple examples to let you get started quickly. You will be able to write a client and a server on top of Netty right away when you are at the end of this chapter. If you prefer top-down approach in learning something, you might want to start from Chapter 2, Architectural Overview and get back here. The minimum requirements to run the examples which are introduced in this chapter are only two; the latest version of Netty and JDK 1.5 or above. The latest version of Netty is available in the project download page . To download the right version of JDK, please refer to your preferred JDK vendor's web site. As you read, you might have more questions about the classes introduced in this chapter. Please refer to the API reference whenever you want to know more about them. All class names in this document are linked to the online API reference for your convenience. Also, please don't hesitate to contact the Netty project community and let us know if there's any incorrect information, errors in grammar and typo, and if you have a good idea to improve the documentation. To implement the DISCARD protocol, the only thing you need to do is to ignore all received data. Let us start straight from the handler implementation, which handles I/O events generated by Netty.
              1 package org.jboss.netty.example.discard;
                public class DiscardServerHandler extends SimpleChannelHandler {1
                    @Override
              6     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {2
                    @Override
             10     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {3
                        e.getCause().printStackTrace();
                        Channel ch = e.getChannel();
             14         ch.close();
             16 }
            DiscardServerHandler extends SimpleChannelHandler , which is an implementation of ChannelHandler . SimpleChannelHandler provides various event handler methods that you can override. For now, it is just enough to extend SimpleChannelHandler rather than to implement the handler interfaces by yourself. We override the messageReceived event handler method here. This method is called with a MessageEvent , which contains the received data, whenever new data is received from a client. In this example, we ignore the received data by doing nothing to implement the DISCARD protocol. exceptionCaught event handler method is called with an ExceptionEvent when an exception was raised by Netty due to I/O error or by a handler implementation due to the exception thrown while processing events. In most cases, the caught exception should be logged and its associated channel should be closed here, although the implementation of this method can be different depending on what you want to do to deal with an exceptional situation. For example, you might want to send a response message with an error code before closing the connection. So far so good. We have implemented the first half of the DISCARD server. What's left now is to write the main method which starts the server with the DiscardServerHandler .
              1 package org.jboss.netty.example.discard;
                import java.net.InetSocketAddress;
              4 import java.util.concurrent.Executors;
              6 public class DiscardServer {
              8     public static void main(String[] args) throws Exception {
                        ChannelFactory factory =
             10             new NioServerSocketChannelFactory4(
                                    Executors.newCachedThreadPool(),
             12                     Executors.newCachedThreadPool());
             14         ServerBootstrap bootstrap = new ServerBootstrap5(factory);
             16         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {6
                            public ChannelPipeline getPipeline() {
             18                 return Channels.pipeline(new DiscardServerHandler());
             20         });
             22         bootstrap.setOption("child.tcpNoDelay", true);7
                        bootstrap.setOption("child.keepAlive", true);
                        bootstrap.bind(new InetSocketAddress(8080));8
             26     }
                                          ChannelFactory is a factory which creates and manages Channels
                                          and its related resources.  It processes all I/O requests and
                                          performs I/O to generate ChannelEvents.  Netty provides various
                                          ChannelFactory implementations.  We are implementing a server-side
                                          application in this example, and therefore
                                          NioServerSocketChannelFactory was used.  Another thing to note is
                                          that it does not create I/O threads by itself.  It is supposed to
                                          acquire threads from the thread pool you specified in the
                                          constructor, and it gives you more control over how threads should
                                          be managed in the environment where your application runs, such as
                                          an application server with a security manager.
                                          ServerBootstrap is a helper class that sets up a server. You can
                                          set up the server using a Channel directly.  However, please note
                                          that this is a tedious process and you do not need to do that in most
                                          cases.
                                          Here, we configure the ChannelPipelineFactory.  Whenever a new
                                          connection is accepted by the server, a new ChannelPipeline will be
                                          created by the specified ChannelPipelineFactory.  The new pipeline
                                          contains the DiscardServerHandler.  As the
                                          application gets complicated, it is likely that you will add more
                                          handlers to the pipeline and extract this anonymous class into a top
                                          level class eventually.
                                          You can also set the parameters which are specific to the Channel
                                          implementation.  We are writing a TCP/IP server, so we are allowed
                                          to set the socket options such as tcpNoDelay and
                                          keepAlive.  Please note that the
                                          "child." prefix was added to all options.  It
                                          means the options will be applied to the accepted Channels instead
                                          of the options of the ServerSocketChannel.  You could do the
                                          following to set the options of the ServerSocketChannel:
                                          

            bootstrap.setOption("reuseAddress", true);
            We are ready to go now. What's left is to bind to the port and to start the server. Here, we bind to the port 8080 of all NICs (network interface cards) in the machine. You can now call the bind method as many times as you want (with different bind addresses.) Now that we have written our first server, we need to test if it really works. The easiest way to test it is to use the telnet command. For example, you could enter "telnet localhost 8080" in the command line and type something. However, can we say that the server is working fine? We cannot really know that because it is a discard server. You will not get any response at all. To prove it is really working, let us modify the server to print what it has received. We already know that MessageEvent is generated whenever data is received and the messageReceived handler method will be invoked. Let us put some code into the messageReceived method of the DiscardServerHandler:
              1 @Override
              2 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                    ChannelBuffer9 buf = (ChannelBuffer) e.getMessage();
              4     while(buf.readable()) {
                        System.out.println((char) buf.readByte());
              6         System.out.flush();
                                          It is safe to assume the message type in socket transports is always
                                          ChannelBuffer.  ChannelBuffer is a fundamental data structure
                                          which stores a sequence of bytes in Netty.  It's similar to NIO
                                          ByteBuffer, but it is easier to use and more
                                          flexible.  For example, Netty allows you to create a composite
                                          ChannelBuffer which combines multiple ChannelBuffers reducing
                                          the number of unnecessary memory copy.
                                          Although it resembles to NIO ByteBuffer a lot,
                                          it is highly recommended to refer to the API reference.  Learning how
                                          to use ChannelBuffer correctly is a critical step in using Netty
                                          without difficulty.  
                              The full source code of the discard server is located in the
                              org.jboss.netty.example.discard package of the
                              distribution.
                              So far, we have been consuming data without responding at all.  A server,
                              however, is usually supposed to respond to a request.  Let us learn how to
                              write a response message to a client by implementing the 
                              ECHO protocol,
                              where any received data is sent back. 
                              The only difference from the discard server we have implemented in the
                              previous sections is that it sends the received data back instead of
                              printing the received data out to the console.  Therefore, it is enough
                              again to modify the messageReceived method:
                           
              1 @Override
              2 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                    Channel10 ch = e.getChannel();
              4     ch.write(e.getMessage());
                                          A ChannelEvent object has a reference to its associated Channel.
                                          Here, the returned Channel represents the connection which received
                                          the MessageEvent.  We can get the Channel and call the
                                          write method to write something back to
                                          the remote peer. 
                              The full source code of the echo server is located in the
                              org.jboss.netty.example.echo package of the
                              distribution.
                              The protocol to implement in this section is the
                              TIME protocol.
                              It is different from the previous examples in that it sends a message,
                              which contains a 32-bit integer, without receiving any requests and 
                              loses the connection once the message is sent.  In this example, you
                              will learn how to construct and send a message, and to close the
                              connection on completion.
                              Because we are going to ignore any received data but to send a message
                              as soon as a connection is established, we cannot use the
                              messageReceived method this time.  Instead,
                              we should override the channelConnected method.
                              The following is the implementation:
                           
              1 package org.jboss.netty.example.time;
                public class TimeServerHandler extends SimpleChannelHandler {
                    @Override
              6     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {11
                        Channel ch = e.getChannel();
                        ChannelBuffer time = ChannelBuffers.buffer(4);12
             10         time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
             12         ChannelFuture f = ch.write(time);13
             14         f.addListener(new ChannelFutureListener() {14
                            public void operationComplete(ChannelFuture future) {
             16                 Channel ch = future.getChannel();
                                ch.close();
             18             }
             20     }
             22     @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
             24         e.getCause().printStackTrace();
                        e.getChannel().close();
             26     }
                                          As explained, channelConnected method will
                                          be invoked when a connection is established.  Let us write the 32-bit
                                          integer that represents the current time in seconds here.
                                          To send a new message, we need to allocate a new buffer which will
                                          contain the message.  We are going to write a 32-bit integer, and
                                          therefore we need a ChannelBuffer whose capacity is
                                          4 bytes.  The ChannelBuffers helper class is
                                          used to allocate a new buffer.  Besides the
                                          buffer method, ChannelBuffers provides a
                                          lot of useful methods related to the ChannelBuffer.  For more
                                          information, please refer to the API reference.
                                          On the other hand, it is a good idea to use static imports for
                                          ChannelBuffers:
                                          

              1 import static org.jboss.netty.buffer.ChannelBuffers.*;
              2 ...
                ChannelBuffer  dynamicBuf = dynamicBuffer(256);
              4 ChannelBuffer ordinaryBuf = buffer(1024);
            But wait, where's the flip? Didn't we used to call ByteBuffer.flip() before sending a message in NIO? ChannelBuffer does not have such a method because it has two pointers; one for read operations and the other for write operations. The writer index increases when you write something to a ChannelBuffer while the reader index does not change. The reader index and the writer index represents where the message starts and ends respectively. In contrast, NIO buffer does not provide a clean way to figure out where the message content starts and ends without calling the flip method. You will be in trouble when you forget to flip the buffer because nothing or incorrect data will be sent. Such an error does not happen in Netty because we have different pointer for different operation types. You will find it makes your life much easier as you get used to it -- a life without flipping out! Another point to note is that the write method returns a ChannelFuture. A ChannelFuture represents an I/O operation which has not yet occurred. It means, any requested operation might not have been performed yet because all operations are asynchronous in Netty. For example, the following code might close the connection even before a message is sent:
              1 Channel ch = ...;
              2 ch.write(message);
                ch.close();
            Therefore, you need to call the close method after the ChannelFuture, which was returned by the write method, notifies you when the write operation has been done. Please note that, close also might not close the connection immediately, and it returns a ChannelFuture. How do we get notified when the write request is finished then? This is as simple as adding a ChannelFutureListener to the returned ChannelFuture. Here, we created a new anonymous ChannelFutureListener which closes the Channel when the operation is done. Alternatively, you could simplify the code using a pre-defined listener:

            f.addListener(ChannelFutureListener.CLOSE);
            To test if our time server works as expected, you can use the UNIX rdate command:

            $ rdate -o <port> -p <host>
            where port is the port number you specified in the main() method and host is usually localhost. Unlike DISCARD and ECHO servers, we need a client for the TIME protocol because a human cannot translate a 32-bit binary data into a date on a calendar. In this section, we discuss how to make sure the server works correctly and learn how to write a client with Netty. The biggest and only difference between a server and a client in Netty is that different Bootstrap and ChannelFactory are required. Please take a look at the following code:
              1 package org.jboss.netty.example.time;
                import java.net.InetSocketAddress;
              4 import java.util.concurrent.Executors;
              6 public class TimeClient {
              8     public static void main(String[] args) throws Exception {
                        String host = args[0];
             10         int port = Integer.parseInt(args[1]);
             12         ChannelFactory factory =
                            new NioClientSocketChannelFactory15(
             14                     Executors.newCachedThreadPool(),
                                    Executors.newCachedThreadPool());
                        ClientBootstrap bootstrap = new ClientBootstrap(16)(factory);
                        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
             20             public ChannelPipeline getPipeline() {
                                return Channels.pipeline(new TimeClientHandler());
             22             }
                        bootstrap.setOption("tcpNoDelay"(17), true);
             26         bootstrap.setOption("keepAlive", true);
             28         bootstrap.connect(18)(new InetSocketAddress(host, port));
             30 }
            As you can see, it is not really different from the server side startup. What about the ChannelHandler implementation? It should receive a 32-bit integer from the server, translate it into a human readable format, print the translated time, and close the connection:
              1 package org.jboss.netty.example.time;
                import java.util.Date;
                public class TimeClientHandler extends SimpleChannelHandler {
                    @Override
              8     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
             10         long currentTimeMillis = buf.readInt() * 1000L;
                        System.out.println(new Date(currentTimeMillis));
             12         e.getChannel().close();
                    @Override
             16     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
                        e.getCause().printStackTrace();
             18         e.getChannel().close();
             20 }
            It looks very simple and does not look any different from the server side example. However, this handler sometimes will refuse to work raising an IndexOutOfBoundsException. We discuss why this happens in the next section. In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote. For example, let us assume that the TCP/IP stack of an operating system has received three packets:
              1 +-----+-----+-----+
              2 | ABC | DEF | GHI |
                +-----+-----+-----+
            Because of this general property of a stream-based protocol, there's high chance of reading them in the following fragmented form in your application:
              1 +----+-------+---+---+
              2 | AB | CDEFG | H | I |
                +----+-------+---+---+
            Therefore, a receiving part, regardless it is server-side or client-side, should defrag the received data into one or more meaningful frames that could be easily understood by the application logic. In case of the example above, the received data should be framed like the following:
              1 +-----+-----+-----+
              2 | ABC | DEF | GHI |
                +-----+-----+-----+
            The First Solution Now let us get back to the TIME client example. We have the same problem here. A 32-bit integer is a very small amount of data, and it is not likely to be fragmented often. However, the problem is that it can be fragmented, and the possibility of fragmentation will increase as the traffic increases. The simplistic solution is to create an internal cumulative buffer and wait until all 4 bytes are received into the internal buffer. The following is the modified TimeClientHandler implementation that fixes the problem:
              1 package org.jboss.netty.example.time;
                import static org.jboss.netty.buffer.ChannelBuffers.*;
                import java.util.Date;
                public class TimeClientHandler extends SimpleChannelHandler {
                    private final ChannelBuffer buf = dynamicBuffer();(19)
                    @Override
             12     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                        ChannelBuffer m = (ChannelBuffer) e.getMessage();
             14         buf.writeBytes(m);(20)
             16         if (buf.readableBytes() >= 4) {(21)
                            long currentTimeMillis = buf.readInt() * 1000L;
             18             System.out.println(new Date(currentTimeMillis));
                            e.getChannel().close();
             20         }
                    @Override
             24     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
                        e.getCause().printStackTrace();
             26         e.getChannel().close();
             28 }
            A dynamic buffer is a ChannelBuffer which increases its capacity on demand. It's very useful when you don't know the length of the message. And then, the handler must check if buf has enough data, 4 bytes in this example, and proceed to the actual business logic. Otherwise, Netty will call the messageReceived method again when more data arrives, and eventually all 4 bytes will be cumulated. Although the first solution has resolved the problem with the TIME client, the modified handler does not look that clean. Imagine a more complicated protocol which is composed of multiple fields such as a variable length field. Your ChannelHandler implementation will become unmaintainable very quickly. As you may have noticed, you can add more than one ChannelHandler to a ChannelPipeline, and therefore, you can split one monolithic ChannelHandler into multiple modular ones to reduce the complexity of your application. For example, you could split TimeClientHandler into two handlers: Fortunately, Netty provides an extensible class which helps you write the first one out of the box:
              1 package org.jboss.netty.example.time;
                public class TimeDecoder extends FrameDecoder(22) {
                    @Override
              6     protected Object decode(
                            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)(23) {
                        if (buffer.readableBytes() < 4) {
             10             return null; (24)
                        return buffer.readBytes(4);(25)
             14     }
                                             FrameDecoder calls decode method with
                                             an internally maintained cumulative buffer whenever new data is
                                             received.
                                             If null is returned, it means there's not 
                                             enough data yet.  FrameDecoder will call again when there is a
                                             sufficient amount of data.
                                             If non-null is returned, it means the
                                             decode method has decoded a message
                                             successfully.  FrameDecoder will discard the read part of its
                                             internal cumulative buffer.  Please remember that you don't need
                                             to decode multiple messages.  FrameDecoder will keep calling
                                             the decoder method until it returns
                                             null.
                                 Now that we have another handler to insert into the ChannelPipeline,
                                 we should modify the ChannelPipelineFactory implementation in the
                                 TimeClient:
                              
              1         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
              2             public ChannelPipeline getPipeline() {
                                return Channels.pipeline(
              4                         new TimeDecoder(),
                                        new TimeClientHandler());
                                 If you are an adventurous person, you might want to try the
                                 ReplayingDecoder which simplifies the decoder even more.  You will
                                 need to consult the API reference for more information though.
                              
              1 package org.jboss.netty.example.time;
                public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
                    @Override
              6     protected Object decode(
                            ChannelHandlerContext ctx, Channel channel,
              8             ChannelBuffer buffer, VoidEnum state) {
             10         return buffer.readBytes(4);
             12 }
            Additionally, Netty provides out-of-the-box decoders which enables you to implement most protocols very easily and helps you avoid from ending up with a monolithic unmaintainable handler implementation. Please refer to the following packages for more detailed examples: All the examples we have reviewed so far used a ChannelBuffer as a primary data structure of a protocol message. In this section, we will improve the TIME protocol client and server example to use a POJO instead of a ChannelBuffer. The advantage of using a POJO in your ChannelHandler is obvious; your handler becomes more maintainable and reusable by separating the code which extracts information from ChannelBuffer out from the handler. In the TIME client and server examples, we read only one 32-bit integer and it is not a major issue to use ChannelBuffer directly. However, you will find it is necessary to make the separation as you implement a real world protocol. First, let us define a new type called UnixTime.
              1 package org.jboss.netty.example.time;
                import java.util.Date;
                public class UnixTime {
              6     private final int value;
              8     public UnixTime(int value) {
                        this.value = value;
             10     }
             12     public int getValue() {
                        return value;
             14     }
             16     @Override
                    public String toString() {
             18         return new Date(value * 1000L).toString();
             20 }
            We can now revise the TimeDecoder to return a UnixTime instead of a ChannelBuffer.
              1 @Override
              2 protected Object decode(
                        ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
              4     if (buffer.readableBytes() < 4) {
                        return null;
              8     return new UnixTime(buffer.readInt());(26)
                                          FrameDecoder and ReplayingDecoder allow you to return an object
                                          of any type.  If they were restricted to return only a
                                          ChannelBuffer, we would have to insert another ChannelHandler
                                          which transforms a ChannelBuffer into a
                                          UnixTime.
                           
              1 @Override
              2 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                    UnixTime m = (UnixTime) e.getMessage();
              4     System.out.println(m);
                    e.getChannel().close();
                              Much simpler and elegant, right?  The same technique can be applied on
                              the server side.  Let us update the
                              TimeServerHandler first this time:
                           
              1 @Override
              2 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
                    UnixTime time = new UnixTime(System.currentTimeMillis() / 1000);
              4     ChannelFuture f = e.getChannel().write(time);
                    f.addListener(ChannelFutureListener.CLOSE);
                              Now, the only missing piece is an encoder, which is an implementation of
                              ChannelHandler that translates a UnixTime back
                              into a ChannelBuffer.  It's much simpler than writing a decoder because
                              there's no need to deal with packet fragmentation and assembly when
                              encoding a message.
                           
              1 package org.jboss.netty.example.time;
                import static org.jboss.netty.buffer.ChannelBuffers.*;
                public class TimeEncoder extends SimpleChannelHandler {
                    public void writeRequested(ChannelHandlerContext ctx, MessageEvent(27) e) {
              8         UnixTime time = (UnixTime) e.getMessage();
             10         ChannelBuffer buf = buffer(4);
                        buf.writeInt(time.getValue());
                        Channels.write(ctx, e.getFuture(), buf);(28)
             14     }
                                          An encoder overrides the writeRequested
                                          method to intercept a write request.  Please note that the
                                          MessageEvent parameter here is the same type which was specified
                                          in messageReceived but they are interpreted
                                          differently.  A ChannelEvent can be either an
                                          upstream or downstream
                                          event depending on the direction where the event flows.
                                          For instance, a MessageEvent can be an upstream event when called
                                          for messageReceived or a downstream event
                                          when called for writeRequested.
                                          Please refer to the API reference to learn more about the difference
                                          between a upstream event and a downstream event.
                                          Once done with transforming a POJO into a ChannelBuffer, you should
                                          forward the new buffer to the previous ChannelDownstreamHandler in
                                          the ChannelPipeline.  Channels provides various helper methods
                                          which generates and sends a ChannelEvent.  In this example,
                                          Channels.write(...) method creates a new
                                          MessageEvent and sends it to the previous ChannelDownstreamHandler
                                          in the ChannelPipeline.
                                          On the other hand, it is a good idea to use static imports for
                                          Channels:
                                          

              1 import static org.jboss.netty.channel.Channels.*;
              2 ...
                ChannelPipeline pipeline = pipeline();
              4 write(ctx, e.getFuture(), buf);
                fireChannelDisconnected(ctx);
            The last task left is to insert a TimeEncoder into the ChannelPipeline on the server side, and it is left as a trivial exercise. If you ran the TimeClient, you must have noticed that the application doesn't exit but just keep running doing nothing. Looking from the full stack trace, you will also find a couple I/O threads are running. To shut down the I/O threads and let the application exit gracefully, you need to release the resources allocated by ChannelFactory. The shutdown process of a typical network application is composed of the following three steps: To apply the three steps above to the TimeClient, TimeClient.main() could shut itself down gracefully by closing the only one client connection and releasing all resources used by ChannelFactory:
              1 package org.jboss.netty.example.time;
                public class TimeClient {
              4     public static void main(String[] args) throws Exception {
              6         ChannelFactory factory = ...;
                        ClientBootstrap bootstrap = ...;
              8         ...
                        ChannelFuture future(29) = bootstrap.connect(...);
             10         future.awaitUninterruptibly();(30)
                        if (!future.isSuccess()) {
             12             future.getCause().printStackTrace();(31)
             14         future.getChannel().getCloseFuture().awaitUninterruptibly();(32)
                        factory.releaseExternalResources();(33)
             16     }
                                          The connect method of ClientBootstrap
                                          returns a ChannelFuture which notifies when a connection attempt
                                          succeeds or fails.  It also has a reference to the Channel which
                                          is associated with the connection attempt.
                                          If failed, we print the cause of the failure to know why it failed.
                                          the getCause() method of ChannelFuture will
                                          return the cause of the failure if the connection attempt was neither
                                          successful nor cancelled.
                                          Now that the connection attempt is over, we need to wait until the
                                          connection is closed by waiting for the closeFuture
                                          of the Channel.  Every Channel has its own closeFuture
                                          so that you are notified and can perform a certain action on closure.
                                          Even if the connection attempt has failed the closeFuture
                                          will be notified because the Channel will be closed automatically
                                          when the connection attempt fails.
                                          All connections have been closed at this point.  The only task left
                                          is to release the resources being used by ChannelFactory.  It is as
                                          simple as calling its releaseExternalResources()
                                          method.  All resources including the NIO Selectors
                                          and thread pools will be shut down and terminated automatically.
                              Shutting down a client was pretty easy, but how about shutting down a
                              server?  You need to unbind from the port and close all open accepted
                              connections.  To do this, you need a data structure that keeps track of
                              the list of active connections, and it's not a trivial task.  Fortunately,
                              there is a solution, ChannelGroup.
                              ChannelGroup is a special extension of Java collections API which
                              represents a set of open Channels.  If a Channel is added to a
                              ChannelGroup and the added Channel is closed, the closed Channel
                              is removed from its ChannelGroup automatically.  You can also perform
                              an operation on all Channels in the same group.  For instance, you can
                              close all Channels in a ChannelGroup when you shut down your server.
                              To keep track of open sockets, you need to modify the
                              TimeServerHandler to add a new open Channel to
                              the global ChannelGroup, TimeServer.allChannels:
                           
              1 @Override
              2 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
                    TimeServer.allChannels.add(e.getChannel());(34)
                              Now that the list of all active Channels are maintained automatically,
                              shutting down a server is as easy as shutting down a client:
                           
              1 package org.jboss.netty.example.time;
                public class TimeServer {
                    static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"(35));
                    public static void main(String[] args) throws Exception {
              8         ...
                        ChannelFactory factory = ...;
             10         ServerBootstrap bootstrap = ...;
             12         Channel channel(36) = bootstrap.bind(...);
                        allChannels.add(channel);(37)
             14         waitForShutdownCommand();(38)
                        ChannelGroupFuture future = allChannels.close();(39)
             16         future.awaitUninterruptibly();
                        factory.releaseExternalResources();
             18     }
                                          DefaultChannelGroup requires the name of the group as a constructor
                                          parameter.  The group name is solely used to distinguish one group
                                          from others.
                                          The bind method of ServerBootstrap
                                          returns a server side Channel which is bound to the specified
                                          local address.  Calling the close() method
                                          of the returned Channel will make the Channel unbind from the
                                          bound local address.
                                          Any type of Channels can be added to a ChannelGroup regardless if
                                          it is either server side, client-side, or accepted.  Therefore,
                                          you can close the bound Channel along with the accepted Channels
                                          in one shot when the server shuts down.
                                          waitForShutdownCommand() is an imaginary
                                          method that waits for the shutdown signal.  You could wait for a
                                          message from a privileged client or the JVM shutdown hook.
                                          You can perform the same operation on all channels in the same 
                                          ChannelGroup.  In this case, we close all channels, which means
                                          the bound server-side Channel will be unbound and all accepted
                                          connections will be closed asynchronously.  To notify when all
                                          connections were closed successfully, it returns a ChannelGroupFuture
                                          which has a similar role with ChannelFuture.
                              There is more detailed information about Netty in the upcoming chapters. We
                              also encourage you to review the Netty examples in the 
                              org.jboss.netty.example
                              package.
                              Please also note that the
                              community is always waiting for your
                              questions and ideas to help you and keep improving Netty based on your
                              feed back.
                              
          • 2. Universal Asynchronous I/O API
          • 3. Event Model based on the Interceptor Chain Pattern
          • 4. Advanced Components for More Rapid Development
            • 4.1. Codec framework
            • 4.2. SSL / TLS Support
            • 4.3. HTTP Implementation
            • 4.4. WebSockets Implementation
            • 4.5. Google Protocol Buffer Integration
            • 5. Summary
            • In this chapter, we will examine what core functionalities are provided in Netty and how they constitute a complete network application development stack on top of the core. Please keep this diagram in mind as you read this chapter. Also keep in mind that a lot of the detailed documentation is in the javadoc. Please click on links to class names and package names. Netty uses its own buffer API instead of NIO ByteBuffer to represent a sequence of bytes. This approach has significant advantages over using ByteBuffer. Netty's new buffer type, ChannelBuffer has been designed from the ground up to address the problems of ByteBuffer and to meet the daily needs of network application developers. To list a few cool features: When transfering data between communication layers, data often needs to be combined or sliced. For example, if a payload is split over multiple packages, it often needs to be be combined for decoding. Traditionally, data from the multiple packages are combined by copying them into a byte buffer. Netty supports a zero-copy approach where by a ChannelBuffer "points" to the required buffers hence eliminating the need to perform a copy. Traditional I/O APIs in Java provide different types and methods for different transport types. For example, java.net.Socket and java.net.DatagramSocket do not have any common super type and therefore they have very different ways to perform socket This mismatch makes porting a network application from one transport to another tedious and difficult. The lack of portability between transports becomes a problem when you need to support additional transports, as this often entails rewriting the network layer of the application. Logically, many protocols can run on more than one transport such as TCP/IP, UDP/IP, SCTP, and serial port communication. To make matters worse, Java's New I/O (NIO) API introduced incompatibilities with the old blocking I/O (OIO) API and will continue to do so in the next release, NIO.2 (AIO). Because all these APIs are different from each other in design and performance characteristics, you are often forced to determine which API your application will depend on before you even begin the implementation phase. For instance, you might want to start with OIO because the number of clients you are going to serve will be very small and writing a socket server using OIO is much easier than using NIO. However, you are going to be in trouble when your business grows exponentially and your server needs to serve tens of thousands of clients simultaneously. You could start with NIO, but doing so may hinder rapid development by greatly increasing development time due to the complexity of the NIO Selector Netty has a universal asynchronous I/O interface called a Channel, which abstracts away all operations required for point-to-point communication. That is, once you wrote your application on one Netty transport, your application can run on other Netty transports. Netty provides a number of essential transports via one universal API: Switching from one transport to another usually takes just a couple lines of changes such as choosing a different ChannelFactory implementation. Also, you are even able to take advantage of new transports which aren't yet written (such as serial port communication transport), again by replacing just a couple lines of constructor calls. Moreover, you can write your own transport by extending the core API. A well-defined and extensible event model is a must for an event-driven application. Netty has a well-defined event model focused on I/O. It also allows you to implement your own event type without breaking the existing code because each event type is distinguished from another by a strict type hierarchy. This is another differentiator against other frameworks. Many NIO frameworks have no or a very limited notion of an event model. If they offer extension at all, they often break the existing code when you try to add custom event types A ChannelEvent is handled by a list of ChannelHandlers in a ChannelPipeline. The pipeline implements an advanced form of the Intercepting Filter pattern to give a user full control over how an event is handled and how the handlers in the pipeline interact with each other. For example, you can define what to do when data is read from a socket:
                1 public class MyReadHandler implements SimpleChannelHandler {
                2     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) {
                          Object message = evt.getMessage();
                4         // Do something with the received message.
                          // And forward the event to the next handler.
                8         ctx.sendUpstream(evt);
               10 }
              You can also define what to do when a handler receives a write request:
                1 public class MyWriteHandler implements SimpleChannelHandler {
                2     public void writeRequested(ChannelHandlerContext ctx, MessageEvent evt) {
                          Object message = evt.getMessage();
                4         // Do something with the message to be written.
                          // And forward the event to the next handler.
                8         ctx.sendDownstream(evt);
               10 }
              For more information on the event model, please refer to the API documentation of ChannelEvent and ChannelPipeline. On top of the core components mentioned above, that already enable the implementation of all types of network applications, Netty provides a set of advanced features to accelerate the page of development even more. As demonstrated in Section 8, “ Speaking in POJO instead of ChannelBuffer ”, it is always a good idea to separate a protocol codec from business logic. However, there are some complications when implementing this idea from scratch. You have to deal with the fragmentation of messages. Some protocols are multi-layered (i.e. built on top of other lower level protocols). Some are too complicated to be implemented in a single state machine. Consequently, a good network application framework should provide an extensible, reusable, unit-testable, and multi-layered codec framework that generates maintainable user codecs. Netty provides a number of basic and advanced codecs to address most issues you will encounter when you write a protocol codec regardless if it is simple or not, binary or text - simply whatever. Unlike old blocking I/O, it is a non-trivial task to support SSL in NIO. You can't simply wrap a stream to encrypt or decrypt data but you have to use javax.net.ssl.SSLEngine. SSLEngine is a state machine which is as complex as SSL itself. You have to manage all possible states such as cipher suite and encryption key negotiation (or re-negotiation), certificate exchange, and validation. Moreover, SSLEngine is not even completely thread-safe, as one would expect. In Netty, SslHandler takes care of all the gory details and pitfalls of SSLEngine. All you need to do is to configure the SslHandler and insert it into your ChannelPipeline. It also allows you to implement advanced features like StartTLS very easily. HTTP is definitely the most popular protocol in the Internet. There are already a number of HTTP implementations such as a Servlet container. Then why does Netty have HTTP on top of its core? Netty's HTTP support is very different from the existing HTTP libraries. It gives you complete control over how HTTP messages are exchanged at a low level. Because it is basically the combination of an HTTP codec and HTTP message classes, there is no restriction such as an enforced thread model. That is, you can write your own HTTP client or server that works exactly the way you want. You have full control over everything that's in the HTTP specification, including the thread model, connection life cycle, and chunked encoding. Thanks to its highly customizable nature, you can write a very efficient HTTP server such as: WebSockets allows for a bi-directional, full-duplex communications channels, over a single Transmission Control Protocol (TCP) socket. It is designed to allow streaming of data between a web browser and a web server. The WebSocket protocol has been standardized by the IETF as RFC 6455. Netty implementes RFC 6455 and a number of older versions of the specification. Please refer to the org.jboss.netty.handler.codec.http.websocketx package and associated examples. Google Protocol Buffers are an ideal solution for the rapid implementation of a highly efficient binary protocols that evolve over time. With ProtobufEncoder and ProtobufDecoder, you can turn the message classes generated by the Google Protocol Buffers Compiler (protoc) into Netty codec. Please take a look into the 'LocalTime' example that shows how easily you can create a high-performing binary protocol client and server from the sample protocol definition. In this chapter, we reviewed the overall architecture of Netty from the feature standpoint. Netty has a simple, yet powerful architecture. It is composed of three components - buffer, channel, and event model - and all advanced features are built on top of the three core components. Once you understood how these three work together, it should not be difficult to understand the more advanced features which were covered briefly in this chapter. You might still have unanswered questions about what the overall architecture looks like exactly and how each of the features work together. If so, it is a good idea to talk to us to improve this guide.
            • 1. When can I write downstream data?
            • 2. How do I incorporate my blocking application code with the non-blocking NioServerSocketChannelFactory?
            • 3. Do I need to synchronize my handler code given that events can happen at the same time?
            • 4. How do I pass data between handlers in the same Channel?
            • This FAQ is a summary of question and answers from StackOverflow. As long as you have the reference to the Channel (or ChannelHandlerContext), you can call Channel.write() (or Channels.write()) from anywhere, any thread. writeRequested() is called when you trigger the writeRequested event by calling Channel.write() calling ChannelHandlerContext.sendDownstream(MessageEvent). discussion.

              2. How do I incorporate my blocking application code with the non-blocking NioServerSocketChannelFactory?

              NioServerSocketChannelFactory uses boss threads and worker threads. Boss threads are responsible for accepting incoming connections while worker threads are reponsible for performing non-blocking read and write for associated channels. The default number of worker threads in the pool is 2 * the number of available processors. If your applicaiton's handler blocks such as (reading from a database) or is CPU intensive, the worker thread pool maybe exhausted and performance will degrade. We recommend that you implement your blocking application code in another thread pool. You can do this by adding OrderedMemoryAwareThreadPoolExecutor to the the channel pipeline before your handler or implement your own.
                1 public static void main(String[] args) throws Exception {
                2         OrderedMemoryAwareThreadPoolExecutor eventExecutor =
                              new OrderedMemoryAwareThreadPoolExecutor(
                4                     5, 1000000, 10000000, 100,
                                      TimeUnit.MILLISECONDS);
                          ServerBootstrap bootstrap = new ServerBootstrap(
                8                 new NioServerSocketChannelFactory(
                                          Executors.newCachedThreadPool(),
               10                         Executors.newCachedThreadPool()));
               12         sb.setPipelineFactory(new MyPipelineFactory(eventExecutor));
                          sb.bind(socketAddress);
                          // Other code
                          return;               
               18     }
               20     public class MyPipelineFactory implements ChannelPipelineFactory {
                      @Override
               22     public ChannelPipeline getPipeline() throws Exception {
                          // Create a default pipeline implementation.
               24         ChannelPipeline pipeline = pipeline();
               26         pipeline.addLast("decoder", new HttpRequestDecoder());
                          pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
               28         pipeline.addLast("encoder", new HttpResponseEncoder());
                          pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
                          // Insert OrderedMemoryAwareThreadPoolExecutor before your blocking handler
               32         pipeline.addLast("pipelineExecutor", new ExecutionHandler(_pipelineExecutor));
               34         // MyHandler contains code that blocks
                          pipeline.addLast("handler", new MyHandler());
                          return pipeline;
               38     }
               40     public class MyHandler extends SimpleChannelUpstreamHandler {
                          // Your blocking application code
               42 }

              3. Do I need to synchronize my handler code given that events can happen at the same time? Your ChannelUpstreamHandler will be invoked sequentially by the same thread (i.e. an I/O thread) and therefore a handler does not need to worry about being invoked with a new upstream event before the previous upstream event is finished. However, downstream events can be fired by more than one thread simultaneously. If your ChannelDownstreamHandler accesses a shared resource or stores stateful information, you might need proper synchronization. discussion.

  •