Net
Overview
A convenient and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny layer of abstraction on top of reactor (
Eventloop
) and Java NIO Adapters for
TCP socket
along with
UDP socket
Features
Promises
for read and write operations
CSP
ChannelSupplier
and
ChannelConsumer
TcpSocket can operate as a CSP channel with built-in backpressure propagation, and can be plugged into
CSP
/
Datastream
pipeline with all its features (such as buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
ByteBufPool
Reactive sockets
Fully reactive
TCP socket
with TLS support. Allows you to send/receive data to/from the network. It can be used as a building block for creating custom TCP servers/clients
or implementing custom networking protocols. Socket has a simple and intuitive API consisting of
read
/
write
methods.
The
CSP
module can be used to wrap a socket into a
ChannelSupplier
or
ChannelConsumer
There is also an asynchronous
UDP socket
for UDP communications.
Server
The
AbstractReactiveServer
class is the basis for building reactor-based TCP servers (HTTP servers, RPC servers, TCP file services, etc.):
ReactiveServer
WorkerServer
interface, so all
AbstractReactiveServer
subclasses can be used as worker servers right away
ServerSocketSettings
and
SocketSettings
A ready-to-use
PrimaryServer
implementation that works in primary reactor as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers, which then execute the actual “accept” requests in their corresponding worker reactor threads.
Examples
To run the examples, you need to clone ActiveJ from GitHub
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag
v6.0-beta2
. Before running the examples, build the project.
These examples are located at
activej/examples/core/net
Ping-Pong Socket Connection
In this example we will use the
AbstractReactiveServer
implementation,
SimpleServer
, which receives a message and sends a response (
PONG
). We will also use
TCP socket
as a client to send 3 request messages (
PING
).
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
SimpleServer server = SimpleServer.builder(
eventloop,
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce()
.build();
server.listen();
TcpSocket.connect(eventloop, ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> {throw new RuntimeException(e);});
eventloop.run();
}
CSP TCP Client
A simple TCP console client that connects to a TCP server:
private void run() {
System.out.println("Connecting to server at localhost (port 9922)...");
eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
if (e == null) {
System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(getCurrentReactor(), socketChannel, null);
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.streamTo(ChannelConsumers.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
startCommandLineInterface(socket);
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
public static void main(String[] args) {
new TcpClientExample().run();
}
It sends characters, receives some data back through the CSP channel, parses it and then outputs it to the console.
CSP TCP Server
A simple TCP echo server running in an eventloop:
public static void main(String[] args) throws Exception {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
SimpleServer server = SimpleServer.builder(eventloop, socket ->
BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
.map(buf -> {
ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
return ByteBufPool.append(serverBuf, buf);
})
.map(buf -> ByteBufPool.append(buf, CRLF))
.streamTo(ChannelConsumers.ofSocket(socket)))
.withListenPort(PORT)
.build();
server.listen();
System.out.println("Server is running");
System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
eventloop.run();
}
This server listens for connections, and when a client connects, it parses its message and sends it back as a CSP channel via socket.
Datastream TCP Client
graph TB id3-.->id4 subgraph Client id1(produce ints)-->id2(serialize them into bytes) id2-->id3(send bytes over the network) id9(receive those bytes from network)-->id10(deserialize bytes into strings) id10 --> id11(collect those strings in a list) id8-.->id9 subgraph Server id4(receive bytes from network)-->id5(deserialize bytes into ints) id5-->id6(compute strings from those ints somehow) id6-->id7(serialize strings into bytes) id7-->id8(send those bytes over the network)This image illustrates the communication and transformations between TCP client and a server that use Datastream to process data.
public final class TcpClientExample {
public static final int PORT = 9922;
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withFatalErrorHandler(rethrow())
.build();
eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
if (e == null) {
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(eventloop, socketChannel, null);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformWith(ChannelSerializer.create(INT_SERIALIZER))
.streamTo(ChannelConsumers.ofSocket(socket));
ChannelSuppliers.ofSocket(socket)
.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
.toList()
.whenResult(list -> list.forEach(System.out::println));
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
}