1 package org.jboss.netty.example.discard;
import java.net.InetSocketAddress;
4 import java.util.concurrent.Executors;
6 public class DiscardServer {
8 public static void main(String[] args) throws Exception {
ChannelFactory
factory =
10 new NioServerSocketChannelFactory
(
Executors.newCachedThreadPool(),
12 Executors.newCachedThreadPool());
14 ServerBootstrap
bootstrap = new ServerBootstrap
(factory);
16 bootstrap.setPipelineFactory(new ChannelPipelineFactory
() {
public ChannelPipeline
getPipeline() {
18 return Channels
.pipeline(new DiscardServerHandler());
20 });
22 bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(8080));
26 }
ChannelFactory
is a factory which creates and manages Channel
s
and its related resources. It processes all I/O requests and
performs I/O to generate ChannelEvent
s. Netty provides various
ChannelFactory
implementations. We are implementing a server-side
application in this example, and therefore
NioServerSocketChannelFactory
was used. Another thing to note is
that it does not create I/O threads by itself. It is supposed to
acquire threads from the thread pool you specified in the
constructor, and it gives you more control over how threads should
be managed in the environment where your application runs, such as
an application server with a security manager.
ServerBootstrap
is a helper class that sets up a server. You can
set up the server using a Channel
directly. However, please note
that this is a tedious process and you do not need to do that in most
cases.
Here, we configure the ChannelPipelineFactory
. Whenever a new
connection is accepted by the server, a new ChannelPipeline
will be
created by the specified ChannelPipelineFactory
. The new pipeline
contains the DiscardServerHandler
. As the
application gets complicated, it is likely that you will add more
handlers to the pipeline and extract this anonymous class into a top
level class eventually.
You can also set the parameters which are specific to the Channel
implementation. We are writing a TCP/IP server, so we are allowed
to set the socket options such as tcpNoDelay
and
keepAlive
. Please note that the
"child."
prefix was added to all options. It
means the options will be applied to the accepted Channel
s instead
of the options of the ServerSocketChannel
. You could do the
following to set the options of the ServerSocketChannel
:
bootstrap.setOption("reuseAddress", true);
We are ready to go now. What's left is to bind to the port and to
start the server. Here, we bind to the port 8080
of all NICs (network interface cards) in the machine. You can now
call the bind
method as many times as
you want (with different bind addresses.)
Now that we have written our first server, we need to test if it really
works. The easiest way to test it is to use the telnet
command. For example, you could enter "telnet localhost
8080" in the command line and type something.
However, can we say that the server is working fine? We cannot really
know that because it is a discard server. You will not get any response
at all. To prove it is really working, let us modify the server to print
what it has received.
We already know that MessageEvent
is generated whenever data is
received and the messageReceived
handler method
will be invoked. Let us put some code into the
messageReceived
method of the
DiscardServerHandler
:
1 @Override
2 public void messageReceived(ChannelHandlerContext
ctx, MessageEvent
e) {
ChannelBuffer
buf = (ChannelBuffer) e.getMessage();
4 while(buf.readable()) {
System.out.println((char) buf.readByte());
6 System.out.flush();
It is safe to assume the message type in socket transports is always
ChannelBuffer
. ChannelBuffer
is a fundamental data structure
which stores a sequence of bytes in Netty. It's similar to NIO
ByteBuffer
, but it is easier to use and more
flexible. For example, Netty allows you to create a composite
ChannelBuffer
which combines multiple ChannelBuffer
s reducing
the number of unnecessary memory copy.
Although it resembles to NIO ByteBuffer
a lot,
it is highly recommended to refer to the API reference. Learning how
to use ChannelBuffer
correctly is a critical step in using Netty
without difficulty.
The full source code of the discard server is located in the
org.jboss.netty.example.discard
package of the
distribution.
So far, we have been consuming data without responding at all. A server,
however, is usually supposed to respond to a request. Let us learn how to
write a response message to a client by implementing the
ECHO protocol,
where any received data is sent back.
The only difference from the discard server we have implemented in the
previous sections is that it sends the received data back instead of
printing the received data out to the console. Therefore, it is enough
again to modify the messageReceived
method:
1 @Override
2 public void messageReceived(ChannelHandlerContext
ctx, MessageEvent
e) {
Channel
ch = e.getChannel();
4 ch.write(e.getMessage());
A ChannelEvent
object has a reference to its associated Channel
.
Here, the returned Channel
represents the connection which received
the MessageEvent
. We can get the Channel
and call the
write
method to write something back to
the remote peer.
The full source code of the echo server is located in the
org.jboss.netty.example.echo
package of the
distribution.
The protocol to implement in this section is the
TIME protocol.
It is different from the previous examples in that it sends a message,
which contains a 32-bit integer, without receiving any requests and
loses the connection once the message is sent. In this example, you
will learn how to construct and send a message, and to close the
connection on completion.
Because we are going to ignore any received data but to send a message
as soon as a connection is established, we cannot use the
messageReceived
method this time. Instead,
we should override the channelConnected
method.
The following is the implementation:
1 package org.jboss.netty.example.time;
public class TimeServerHandler extends SimpleChannelHandler
{
@Override
6 public void channelConnected(ChannelHandlerContext
ctx, ChannelStateEvent
e) {
Channel
ch = e.getChannel();
ChannelBuffer
time = ChannelBuffers
.buffer(4);
10 time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
12 ChannelFuture
f = ch.write(time);
14 f.addListener(new ChannelFutureListener
() {
public void operationComplete(ChannelFuture
future) {
16 Channel
ch = future.getChannel();
ch.close();
18 }
20 }
22 @Override
public void exceptionCaught(ChannelHandlerContext
ctx, ExceptionEvent
e) {
24 e.getCause().printStackTrace();
e.getChannel().close();
26 }
As explained, channelConnected
method will
be invoked when a connection is established. Let us write the 32-bit
integer that represents the current time in seconds here.
To send a new message, we need to allocate a new buffer which will
contain the message. We are going to write a 32-bit integer, and
therefore we need a ChannelBuffer
whose capacity is
4
bytes. The ChannelBuffers
helper class is
used to allocate a new buffer. Besides the
buffer
method, ChannelBuffers
provides a
lot of useful methods related to the ChannelBuffer
. For more
information, please refer to the API reference.
On the other hand, it is a good idea to use static imports for
ChannelBuffers
:
But wait, where's the flip
? Didn't we used
to call ByteBuffer.flip()
before sending a
message in NIO? ChannelBuffer
does not have such a method because
it has two pointers; one for read operations and the other for write
operations. The writer index increases when you write something to
a ChannelBuffer
while the reader index does not change. The reader
index and the writer index represents where the message starts and
ends respectively.
In contrast, NIO buffer does not provide a clean way to figure out
where the message content starts and ends without calling the
flip
method. You will be in trouble when
you forget to flip the buffer because nothing or incorrect data will
be sent. Such an error does not happen in Netty because we have
different pointer for different operation types. You will find it
makes your life much easier as you get used to it -- a life without
flipping out!
Another point to note is that the write
method returns a ChannelFuture
. A ChannelFuture
represents an
I/O operation which has not yet occurred. It means, any requested
operation might not have been performed yet because all operations
are asynchronous in Netty. For example, the following code might
close the connection even before a message is sent:
1 Channel
ch = ...;
2 ch.write(message);
ch.close();
Therefore, you need to call the close
method after the ChannelFuture
, which was returned by the
write
method, notifies you when the write
operation has been done. Please note that, close
also might not close the connection immediately, and it returns a
ChannelFuture
.
How do we get notified when the write request is finished then?
This is as simple as adding a ChannelFutureListener
to the returned
ChannelFuture
. Here, we created a new anonymous ChannelFutureListener
which closes the Channel
when the operation is done.
Alternatively, you could simplify the code using a pre-defined
listener:
To test if our time server works as expected, you can use the UNIX rdate command:
$ rdate -o <port> -p <host>
where port is the port number you specified in the main()
method and host is usually localhost
.
Unlike DISCARD and ECHO servers, we need a client for the TIME protocol
because a human cannot translate a 32-bit binary data into a date on a
calendar. In this section, we discuss how to make sure the server works
correctly and learn how to write a client with Netty.
The biggest and only difference between a server and a client in Netty
is that different Bootstrap
and ChannelFactory
are required. Please
take a look at the following code:
As you can see, it is not really different from the server side startup.
What about the ChannelHandler
implementation? It should receive a
32-bit integer from the server, translate it into a human readable format,
print the translated time, and close the connection:
It looks very simple and does not look any different from the server side
example. However, this handler sometimes will refuse to work raising an
IndexOutOfBoundsException
. We discuss why
this happens in the next section.
In a stream-based transport such as TCP/IP, received data is stored
into a socket receive buffer. Unfortunately, the buffer of a
stream-based transport is not a queue of packets but a queue of bytes.
It means, even if you sent two messages as two independent packets, an
operating system will not treat them as two messages but as just a
bunch of bytes. Therefore, there is no guarantee that what you read
is exactly what your remote peer wrote. For example, let us assume
that the TCP/IP stack of an operating system has received three packets:
1 +-----+-----+-----+
2 | ABC | DEF | GHI |
+-----+-----+-----+
Because of this general property of a stream-based protocol, there's
high chance of reading them in the following fragmented form in your
application:
1 +----+-------+---+---+
2 | AB | CDEFG | H | I |
+----+-------+---+---+
Therefore, a receiving part, regardless it is server-side or
client-side, should defrag the received data into one or more meaningful
frames that could be easily understood by the
application logic. In case of the example above, the received data
should be framed like the following:
1 +-----+-----+-----+
2 | ABC | DEF | GHI |
+-----+-----+-----+
The First Solution
Now let us get back to the TIME client example. We have the same
problem here. A 32-bit integer is a very small amount of data, and it
is not likely to be fragmented often. However, the problem is that it
can be fragmented, and the possibility of
fragmentation will increase as the traffic increases.
The simplistic solution is to create an internal cumulative buffer and
wait until all 4 bytes are received into the internal buffer. The
following is the modified TimeClientHandler
implementation that fixes the problem:
A dynamic buffer is a ChannelBuffer
which
increases its capacity on demand. It's very useful when you don't
know the length of the message.
And then, the handler must check if buf
has enough
data, 4 bytes in this example, and proceed to the actual business
logic. Otherwise, Netty will call the
messageReceived
method again when more
data arrives, and eventually all 4 bytes will be cumulated.
Although the first solution has resolved the problem with the TIME
client, the modified handler does not look that clean. Imagine a more
complicated protocol which is composed of multiple fields such as a
variable length field. Your ChannelHandler
implementation will
become unmaintainable very quickly.
As you may have noticed, you can add more than one ChannelHandler
to
a ChannelPipeline
, and therefore, you can split one monolithic
ChannelHandler
into multiple modular ones to reduce the complexity of
your application. For example, you could split
TimeClientHandler
into two handlers:
Fortunately, Netty provides an extensible class which helps you write
the first one out of the box:
1 package org.jboss.netty.example.time;
public class TimeDecoder extends FrameDecoder
(22) {
@Override
6 protected Object decode(
ChannelHandlerContext
ctx, Channel
channel, ChannelBuffer
buffer)(23) {
if (buffer.readableBytes() < 4) {
10 return null; (24)
return buffer.readBytes(4);(25)
14 }
FrameDecoder
calls decode
method with
an internally maintained cumulative buffer whenever new data is
received.
If null
is returned, it means there's not
enough data yet. FrameDecoder
will call again when there is a
sufficient amount of data.
If non-null
is returned, it means the
decode
method has decoded a message
successfully. FrameDecoder
will discard the read part of its
internal cumulative buffer. Please remember that you don't need
to decode multiple messages. FrameDecoder
will keep calling
the decoder
method until it returns
null
.
Now that we have another handler to insert into the ChannelPipeline
,
we should modify the ChannelPipelineFactory
implementation in the
TimeClient
:
1 bootstrap.setPipelineFactory(new ChannelPipelineFactory
() {
2 public ChannelPipeline
getPipeline() {
return Channels
.pipeline(
4 new TimeDecoder(),
new TimeClientHandler());
If you are an adventurous person, you might want to try the
ReplayingDecoder
which simplifies the decoder even more. You will
need to consult the API reference for more information though.
Additionally, Netty provides out-of-the-box decoders which enables
you to implement most protocols very easily and helps you avoid from
ending up with a monolithic unmaintainable handler implementation.
Please refer to the following packages for more detailed examples:
All the examples we have reviewed so far used a ChannelBuffer
as a
primary data structure of a protocol message. In this section, we will
improve the TIME protocol client and server example to use a
POJO instead of a
ChannelBuffer
.
The advantage of using a POJO in your ChannelHandler
is obvious;
your handler becomes more maintainable and reusable by separating the
code which extracts information from ChannelBuffer
out from the
handler. In the TIME client and server examples, we read only one
32-bit integer and it is not a major issue to use ChannelBuffer
directly.
However, you will find it is necessary to make the separation as you
implement a real world protocol.
First, let us define a new type called UnixTime
.
1 package org.jboss.netty.example.time;
import java.util.Date;
public class UnixTime {
6 private final int value;
8 public UnixTime(int value) {
this.value = value;
10 }
12 public int getValue() {
return value;
14 }
16 @Override
public String toString() {
18 return new Date(value * 1000L).toString();
20 }
We can now revise the TimeDecoder
to return
a UnixTime
instead of a ChannelBuffer
.
1 @Override
2 protected Object decode(
ChannelHandlerContext
ctx, Channel
channel, ChannelBuffer
buffer) {
4 if (buffer.readableBytes() < 4) {
return null;
8 return new UnixTime(buffer.readInt());(26)
FrameDecoder
and ReplayingDecoder
allow you to return an object
of any type. If they were restricted to return only a
ChannelBuffer
, we would have to insert another ChannelHandler
which transforms a ChannelBuffer
into a
UnixTime
.
1 @Override
2 public void messageReceived(ChannelHandlerContext
ctx, MessageEvent
e) {
UnixTime m = (UnixTime) e.getMessage();
4 System.out.println(m);
e.getChannel().close();
Much simpler and elegant, right? The same technique can be applied on
the server side. Let us update the
TimeServerHandler
first this time:
1 @Override
2 public void channelConnected(ChannelHandlerContext
ctx, ChannelStateEvent
e) {
UnixTime time = new UnixTime(System.currentTimeMillis() / 1000);
4 ChannelFuture
f = e.getChannel().write(time);
f.addListener(ChannelFutureListener
.CLOSE);
Now, the only missing piece is an encoder, which is an implementation of
ChannelHandler
that translates a UnixTime
back
into a ChannelBuffer
. It's much simpler than writing a decoder because
there's no need to deal with packet fragmentation and assembly when
encoding a message.
1 package org.jboss.netty.example.time;
import static org.jboss.netty.buffer.ChannelBuffers
.*;
public class TimeEncoder extends SimpleChannelHandler
{
public void writeRequested(ChannelHandlerContext
ctx, MessageEvent
(27) e) {
8 UnixTime time = (UnixTime) e.getMessage();
10 ChannelBuffer
buf = buffer(4);
buf.writeInt(time.getValue());
Channels
.write(ctx, e.getFuture(), buf);(28)
14 }
An encoder overrides the writeRequested
method to intercept a write request. Please note that the
MessageEvent
parameter here is the same type which was specified
in messageReceived
but they are interpreted
differently. A ChannelEvent
can be either an
upstream or downstream
event depending on the direction where the event flows.
For instance, a MessageEvent
can be an upstream event when called
for messageReceived
or a downstream event
when called for writeRequested
.
Please refer to the API reference to learn more about the difference
between a upstream event and a downstream event.
Once done with transforming a POJO into a ChannelBuffer
, you should
forward the new buffer to the previous ChannelDownstreamHandler
in
the ChannelPipeline
. Channels
provides various helper methods
which generates and sends a ChannelEvent
. In this example,
Channels
.write(...)
method creates a new
MessageEvent
and sends it to the previous ChannelDownstreamHandler
in the ChannelPipeline
.
On the other hand, it is a good idea to use static imports for
Channels
:
1 import static org.jboss.netty.channel.Channels
.*;
2 ...
ChannelPipeline
pipeline = pipeline();
4 write(ctx, e.getFuture(), buf);
fireChannelDisconnected(ctx);
The last task left is to insert a TimeEncoder
into the ChannelPipeline
on the server side, and it is left as a
trivial exercise.
If you ran the TimeClient
, you must have noticed
that the application doesn't exit but just keep running doing nothing.
Looking from the full stack trace, you will also find a couple I/O threads
are running. To shut down the I/O threads and let the application exit
gracefully, you need to release the resources allocated by ChannelFactory
.
The shutdown process of a typical network application is composed of the
following three steps:
To apply the three steps above to the TimeClient
,
TimeClient.main()
could shut itself down
gracefully by closing the only one client connection and releasing all
resources used by ChannelFactory
:
1 package org.jboss.netty.example.time;
public class TimeClient {
4 public static void main(String[] args) throws Exception {
6 ChannelFactory
factory = ...;
ClientBootstrap
bootstrap = ...;
8 ...
ChannelFuture
future(29) = bootstrap.connect(...);
10 future.awaitUninterruptibly();(30)
if (!future.isSuccess()) {
12 future.getCause().printStackTrace();(31)
14 future.getChannel().getCloseFuture().awaitUninterruptibly();(32)
factory.releaseExternalResources();(33)
16 }
The connect
method of ClientBootstrap
returns a ChannelFuture
which notifies when a connection attempt
succeeds or fails. It also has a reference to the Channel
which
is associated with the connection attempt.
If failed, we print the cause of the failure to know why it failed.
the getCause()
method of ChannelFuture
will
return the cause of the failure if the connection attempt was neither
successful nor cancelled.
Now that the connection attempt is over, we need to wait until the
connection is closed by waiting for the closeFuture
of the Channel
. Every Channel
has its own closeFuture
so that you are notified and can perform a certain action on closure.
Even if the connection attempt has failed the closeFuture
will be notified because the Channel
will be closed automatically
when the connection attempt fails.
All connections have been closed at this point. The only task left
is to release the resources being used by ChannelFactory
. It is as
simple as calling its releaseExternalResources()
method. All resources including the NIO Selector
s
and thread pools will be shut down and terminated automatically.
Shutting down a client was pretty easy, but how about shutting down a
server? You need to unbind from the port and close all open accepted
connections. To do this, you need a data structure that keeps track of
the list of active connections, and it's not a trivial task. Fortunately,
there is a solution, ChannelGroup
.
ChannelGroup
is a special extension of Java collections API which
represents a set of open Channel
s. If a Channel
is added to a
ChannelGroup
and the added Channel
is closed, the closed Channel
is removed from its ChannelGroup
automatically. You can also perform
an operation on all Channel
s in the same group. For instance, you can
close all Channel
s in a ChannelGroup
when you shut down your server.
To keep track of open sockets, you need to modify the
TimeServerHandler
to add a new open Channel
to
the global ChannelGroup
, TimeServer.allChannels
:
1 @Override
2 public void channelOpen(ChannelHandlerContext
ctx, ChannelStateEvent
e) {
TimeServer.allChannels.add(e.getChannel());(34)
Now that the list of all active Channel
s are maintained automatically,
shutting down a server is as easy as shutting down a client:
1 package org.jboss.netty.example.time;
public class TimeServer {
static final ChannelGroup
allChannels = new DefaultChannelGroup
("time-server"(35));
public static void main(String[] args) throws Exception {
8 ...
ChannelFactory
factory = ...;
10 ServerBootstrap
bootstrap = ...;
12 Channel
channel(36) = bootstrap.bind(...);
allChannels.add(channel);(37)
14 waitForShutdownCommand();(38)
ChannelGroupFuture
future = allChannels.close();(39)
16 future.awaitUninterruptibly();
factory.releaseExternalResources();
18 }
DefaultChannelGroup
requires the name of the group as a constructor
parameter. The group name is solely used to distinguish one group
from others.
The bind
method of ServerBootstrap
returns a server side Channel
which is bound to the specified
local address. Calling the close()
method
of the returned Channel
will make the Channel
unbind from the
bound local address.
Any type of Channel
s can be added to a ChannelGroup
regardless if
it is either server side, client-side, or accepted. Therefore,
you can close the bound Channel
along with the accepted Channel
s
in one shot when the server shuts down.
waitForShutdownCommand()
is an imaginary
method that waits for the shutdown signal. You could wait for a
message from a privileged client or the JVM shutdown hook.
You can perform the same operation on all channels in the same
ChannelGroup
. In this case, we close all channels, which means
the bound server-side Channel
will be unbound and all accepted
connections will be closed asynchronously. To notify when all
connections were closed successfully, it returns a ChannelGroupFuture
which has a similar role with ChannelFuture
.
There is more detailed information about Netty in the upcoming chapters. We
also encourage you to review the Netty examples in the
org.jboss.netty.example
package.
Please also note that the
community is always waiting for your
questions and ideas to help you and keep improving Netty based on your
feed back.
2. Universal Asynchronous I/O API
3. Event Model based on the Interceptor Chain Pattern
4. Advanced Components for More Rapid Development
- 4.1. Codec framework
- 4.2. SSL / TLS Support
- 4.3. HTTP Implementation
- 4.4. WebSockets Implementation
- 4.5. Google Protocol Buffer Integration
- 5. Summary
In this chapter, we will examine what core functionalities are provided in
Netty and how they constitute a complete network application development
stack on top of the core. Please keep this diagram in mind as you read this
chapter.
Also keep in mind that a lot of the detailed documentation is in the javadoc.
Please click on links to class names and package names.
Netty uses its own buffer API instead of NIO ByteBuffer
to represent a sequence of bytes. This approach has significant advantages
over using ByteBuffer
. Netty's new buffer type,
ChannelBuffer
has been designed from the ground up to address the problems
of ByteBuffer
and to meet the daily needs of
network application developers. To list a few cool features:
When transfering data between communication layers, data often needs to be combined
or sliced.
For example, if a payload is split over multiple packages, it often needs to be be
combined
for decoding.
Traditionally, data from the multiple packages are combined by copying them into a
byte buffer.
Netty supports a zero-copy approach where by a ChannelBuffer
"points" to the required
buffers hence eliminating the need to perform a copy.
Traditional I/O APIs in Java provide different types and methods for
different transport types. For example,
java.net.Socket
and
java.net.DatagramSocket
do not have any common
super type and therefore they have very different ways to perform socket
This mismatch makes porting a network application from one transport to
another tedious and difficult. The lack of portability between
transports becomes a problem when you need to support additional
transports, as this often entails rewriting the network layer of the
application. Logically, many protocols can run on more than one
transport such as TCP/IP, UDP/IP, SCTP, and serial port communication.
To make matters worse, Java's New I/O (NIO) API introduced
incompatibilities with the old blocking I/O (OIO) API and will continue
to do so in the next release, NIO.2 (AIO). Because all these APIs are
different from each other in design and performance characteristics, you
are often forced to determine which API your application will depend on
before you even begin the implementation phase.
For instance, you might want to start with OIO because the number of
clients you are going to serve will be very small and writing a socket
server using OIO is much easier than using NIO. However, you are going
to be in trouble when your business grows exponentially and your server
needs to serve tens of thousands of clients simultaneously. You could
start with NIO, but doing so may hinder rapid development by greatly
increasing development time due to the complexity of the NIO Selector
Netty has a universal asynchronous I/O interface called a Channel
, which
abstracts away all operations required for point-to-point communication.
That is, once you wrote your application on one Netty transport, your
application can run on other Netty transports. Netty provides a number
of essential transports via one universal API:
Switching from one transport to another usually takes just a couple
lines of changes such as choosing a different ChannelFactory
implementation.
Also, you are even able to take advantage of new transports which aren't
yet written (such as serial port communication transport), again
by replacing just a couple lines of constructor calls. Moreover, you can
write your own transport by extending the core API.
A well-defined and extensible event model is a must for an event-driven
application. Netty has a well-defined event model focused on I/O. It
also allows you to implement your own event type without breaking the
existing code because each event type is distinguished from another by
a strict type hierarchy. This is another differentiator against other
frameworks. Many NIO frameworks have no or a very limited notion of an
event model. If they offer extension at all, they often break the
existing code when you try to add custom event types
A ChannelEvent
is handled by a list of ChannelHandler
s in a
ChannelPipeline
. The pipeline implements an advanced form of the
Intercepting Filter
pattern to give a user full control over how an event is handled and how
the handlers in the pipeline interact with each other. For example,
you can define what to do when data is read from a socket:
1 public class MyReadHandler implements SimpleChannelHandler
{
2 public void messageReceived(ChannelHandlerContext
ctx, MessageEvent
evt) {
Object message = evt.getMessage();
4 // Do something with the received message.
// And forward the event to the next handler.
8 ctx.sendUpstream(evt);
10 }
You can also define what to do when a handler receives a write request:
1 public class MyWriteHandler implements SimpleChannelHandler
{
2 public void writeRequested(ChannelHandlerContext
ctx, MessageEvent
evt) {
Object message = evt.getMessage();
4 // Do something with the message to be written.
// And forward the event to the next handler.
8 ctx.sendDownstream(evt);
10 }
For more information on the event model, please refer to the
API documentation of ChannelEvent
and ChannelPipeline
.
On top of the core components mentioned above, that already enable the
implementation of all types of network applications, Netty provides a set
of advanced features to accelerate the page of development even more.
As demonstrated in Section 8, “
Speaking in POJO instead of ChannelBuffer
”, it is always a good
idea to separate a protocol codec from business logic. However, there
are some complications when implementing this idea from scratch. You
have to deal with the fragmentation of messages. Some protocols are
multi-layered (i.e. built on top of other lower level protocols). Some
are too complicated to be implemented in a single state machine.
Consequently, a good network application framework should provide an
extensible, reusable, unit-testable, and multi-layered codec framework
that generates maintainable user codecs.
Netty provides a number of basic and advanced codecs to address most
issues you will encounter when you write a protocol codec regardless
if it is simple or not, binary or text - simply whatever.
Unlike old blocking I/O, it is a non-trivial task to support SSL in NIO.
You can't simply wrap a stream to encrypt or decrypt data but you have
to use javax.net.ssl.SSLEngine
.
SSLEngine
is a state machine which is as complex
as SSL itself. You have to manage all possible states such as cipher
suite and encryption key negotiation (or re-negotiation), certificate
exchange, and validation. Moreover, SSLEngine
is
not even completely thread-safe, as one would expect.
In Netty, SslHandler
takes care of all the gory details and pitfalls
of SSLEngine
. All you need to do is to configure
the SslHandler
and insert it into your ChannelPipeline
. It also
allows you to implement advanced features like
StartTLS
very easily.
HTTP is definitely the most popular protocol in the Internet. There are
already a number of HTTP implementations such as a Servlet container.
Then why does Netty have HTTP on top of its core?
Netty's HTTP support is very different from the existing HTTP libraries.
It gives you complete control over how HTTP messages are exchanged at a
low level. Because it is basically the combination of an HTTP codec and
HTTP message classes, there is no restriction such as an enforced thread
model. That is, you can write your own HTTP client or server that works
exactly the way you want. You have full control over everything that's
in the HTTP specification, including the thread model, connection life
cycle, and chunked encoding.
Thanks to its highly customizable nature, you can write a very efficient
HTTP server such as:
WebSockets allows for a bi-directional,
full-duplex communications channels, over a single Transmission Control Protocol (TCP)
socket.
It is designed to allow streaming of data between a web browser and a web server.
The WebSocket protocol has been standardized by the IETF as RFC 6455.
Netty implementes RFC 6455 and a number of older versions of the specification. Please
refer to the
org.jboss.netty.handler.codec.http.websocketx package and associated
examples.
Google Protocol Buffers
are an ideal solution for the rapid implementation of a highly efficient
binary protocols that evolve over time. With ProtobufEncoder
and
ProtobufDecoder
, you can turn the message classes generated by the
Google Protocol Buffers Compiler (protoc) into Netty codec. Please take
a look into the
'LocalTime' example
that shows how easily you can create a high-performing binary protocol
client and server from the
sample protocol definition.
In this chapter, we reviewed the overall architecture of Netty from the
feature standpoint. Netty has a simple, yet powerful architecture.
It is composed of three components - buffer, channel, and event model -
and all advanced features are built on top of the three core components.
Once you understood how these three work together, it should not be
difficult to understand the more advanced features which were covered
briefly in this chapter.
You might still have unanswered questions about what the overall
architecture looks like exactly and how each of the features work
together. If so, it is a good idea to
talk to us to improve this guide.
- 1. When can I write downstream data?
- 2. How do I incorporate my blocking application code with the non-blocking NioServerSocketChannelFactory?
- 3. Do I need to synchronize my handler code given that events can happen at the same
time?
- 4. How do I pass data between handlers in the same Channel?
This FAQ is a summary of question and answers from
StackOverflow.
As long as you have the reference to the Channel (or ChannelHandlerContext), you can
call Channel.write()
(or Channels.write()) from anywhere, any thread.
writeRequested() is called when you trigger the writeRequested event by calling Channel.write()
calling ChannelHandlerContext.sendDownstream(MessageEvent).
discussion.
2. How do I incorporate my blocking application code with the non-blocking NioServerSocketChannelFactory?
NioServerSocketChannelFactory
uses boss threads and worker threads.
Boss threads are responsible for accepting incoming connections while worker threads
are reponsible for
performing non-blocking read and write for associated channels. The default number
of worker threads in the
pool is 2 * the number of available processors.
If your applicaiton's handler blocks such as (reading from a database) or is CPU intensive,
the worker thread
pool maybe exhausted and performance will degrade.
We recommend that you implement your blocking application code in another thread pool.
You can do this by adding
OrderedMemoryAwareThreadPoolExecutor to the the channel pipeline before your handler
or implement your own.
1 public static void main(String[] args) throws Exception {
2 OrderedMemoryAwareThreadPoolExecutor eventExecutor =
new OrderedMemoryAwareThreadPoolExecutor(
4 5, 1000000, 10000000, 100,
TimeUnit.MILLISECONDS);
ServerBootstrap bootstrap = new ServerBootstrap(
8 new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
10 Executors.newCachedThreadPool()));
12 sb.setPipelineFactory(new MyPipelineFactory(eventExecutor));
sb.bind(socketAddress);
// Other code
return;
18 }
20 public class MyPipelineFactory implements ChannelPipelineFactory {
@Override
22 public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
24 ChannelPipeline pipeline = pipeline();
26 pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
28 pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
// Insert OrderedMemoryAwareThreadPoolExecutor before your blocking handler
32 pipeline.addLast("pipelineExecutor", new ExecutionHandler(_pipelineExecutor));
34 // MyHandler contains code that blocks
pipeline.addLast("handler", new MyHandler());
return pipeline;
38 }
40 public class MyHandler extends SimpleChannelUpstreamHandler {
// Your blocking application code
42 }
3. Do I need to synchronize my handler code given that events can happen at the same
time?
Your ChannelUpstreamHandler
will be invoked sequentially by the same thread (i.e. an I/O thread) and
therefore a handler does not need to worry about being invoked with a new upstream
event before the
previous upstream event is finished.
However, downstream events can be fired by more than one thread simultaneously. If
your ChannelDownstreamHandler
accesses a shared resource or stores stateful information, you might need proper synchronization.
discussion.