package io.smallrye.reactive.operators.quickstart;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
public class QuickStart {
public static void main(String[] args) {
// Create a stream of words
ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators")
.map(String::toUpperCase) // Transform the words
.filter(s -> s.length() > 4) // Filter items
.forEach(word -> System.out.println(">> " + word)) // Terminal operation
.run(); // Run it (create the streams, subscribe to it...)
Once everything is set up, you should be able to run the application using:
mvn compile exec:java -Dexec.mainClass=io.smallrye.reactive.operators.quickstart.QuickStart
Running the previous example should give the following output:
>> HELLO
>> SMALLRYE
>> REACTIVE
>> STREAM
>> OPERATORS
The Reactive Streams Operator is intended to be used in other software and not as a standalone api. However, to give you a better overview the 2 following quickstart explains how to use it in Eclipse Vert.x applications and
Apache Camel applications.
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
public class DataProcessor extends AbstractVerticle {
private static final int PORT = 8080;
@Override
public void start(Future<Void> done) {
vertx.createHttpServer()
.requestHandler(request -> {
// Consume messages from the Vert.x event bus
MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("data");
// Wrap the stream and manipulate the data
ReactiveStreams.fromPublisher(consumer.toFlowable())
.limit(5) // Take only 5 messages
.map(Message::body) // Extract the body
.map(json -> json.getInteger("value")) // Extract the value
.peek(i -> System.out.println("Got value: " + i)) // Print it
.reduce(0, (acc, value) -> acc + value)
.run() // Begin to receive items
.whenComplete((res, err) -> {
// When the 5 items has been consumed, write the result to the
// HTTP response:
if (err != null) {
request.response().setStatusCode(500).end(err.getMessage());
} else {
request.response().end("Result is: " + res);
.listen(PORT, ar -> done.handle(ar.mapEmpty()));
This example creates an HTTP server and for each request to collect 5 messages sent by another component on the Vert.x event bus. It computes the sum of these 5 elements and writes the result to the HTTP response. It’s important
to notice that the messages coming from the event bus are sent asynchronously. So, it would not be possible to write the previous code using java.util.streams.
When used in a Vert.x application, Reactive Stream Operators can be used to processed data and compute an asynchronous result.
1.3. Using Reactive Streams Operators in a Camel application
Apache Camel is a toolkit to define routing and mediation rules, mainly used to integrate systems, using enterprise integration patterns. Apache Camel provides more than 200+ components so that
it can integrate virtually with anything.
You can combine Reactive Stream Operators and Apache Camel thanks to the
Camel Reactive Stream Component.
package io.smallrye.reactive.operators.quickstart;
import org.apache.camel.CamelContext;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.impl.DefaultCamelContext;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Subscriber;
import java.io.File;
import java.nio.file.Files;
public class QuickStart {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
Subscriber<String> subscriber = camel
.subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
"using", "Apache", "Camel")
.map(String::toUpperCase) // Transform the words
.filter(s -> s.length() > 4) // Filter items
.peek(System.out::println)
.map(s -> s + " ")
.to(subscriber)
.run();
context.start();
// Just wait until it's done.
Thread.sleep(1000);
Files.readAllLines(new File("target/values.txt").toPath())
.forEach(s -> System.out.println("File >> " + s));
You can also use Camel to create Reactive Streams Publisher
and transform the items using Reactive Streams Operators.
2. Operators
As mentioned before, the Reactive Streams API is an asynchronous version of java.util.stream
for Reactive Streams. This section list the operators that are provided.
The Reactive Streams Operators introduce a set of types to allow creating Reactive Streams:
Besides, the Reactive Streams Operators introduce CompletionRunner
that triggers the emission of the items and provides a way to retrieve the asynchronously computed result.
2.1. Creating streams
The first part of the API allows to create PublisherBuilder
. A Reactive Streams Publisher
can be created from the builder using the .buildRS
method.
2.1.1. Creating empty streams
Operator: empty
Description: Creates an empty stream.
PublisherBuilder<T> streamOfOne = ReactiveStreams.of(t1);
PublisherBuilder<T> streamOfThree = ReactiveStreams.of(t1, t2, t3);
PublisherBuilder<T> streamOfOneOrEmpty = ReactiveStreams.ofNullable(maybeNull);
PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStage(cs);
// If the redeemed value is `null`, an error is propagated in the stream.
PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStageNullable(cs);
// If the redeemed value is `null`, the stream is completed immediately.
AtomicInteger counter = new AtomicInteger();
PublisherBuilder<Integer> stream = ReactiveStreams
.generate(() -> counter.getAndIncrement());
// The resulting stream is an infinite stream.
PublisherBuilder<Integer> stream = ReactiveStreams
.iterate(0, last -> last + 1);
// The resulting stream is an infinite stream.
distinct - remove similar element (Attention: do not use on large or unbounded streams)
dropWhile - drop elements until the predicate returns true
skip - ignore x elements
takeWhile - forward elements until the predicate returns true
limit - pick x elements
flatMapCompletionStage - Produces a CompletionStage
. When completed, the result is passed to the returned stream.
flatMapIterable - Produces an Iterable
and flatten the element into the returned stream. This flatMap
method is not asynchronous.
flatMapRSPublisher - Like flatMap
but return a Reactive Streams Publisher
ProcessorBuilder<Integer, String> processor = ReactiveStreams
.<Integer>builder().map(i -> Integer.toString(i));
ReactiveStreams.of(1, 2)
.via(processor); // ("1", "2")
2.4. Error management operators
These operators allow recovering after a failure. Because you handle asynchronous streams of data, you can’t use
try/catch
, so these operators provide a similar feature.
2.4.1. Terminal operator and computing asynchronous result
These operators act as subscribers and produce a result. As the result is computed asynchronously, you retrieve a
CompletionStage
object.
2.4.2. Cancelling a stream
Operator: cancel
Description: Cancel the subscription to a stream. No more items will be received.
ReactiveStreams.of(1, 2, 3)
.collect(() -> new AtomicInteger(1), AtomicInteger::addAndGet)
.run()
// Produce 7
.thenAccept(res -> System.out.println("Result is: " + res));
ReactiveStreams.of(1, 2, 3)
.reduce((acc, item) -> acc + item)
.run()
// Produce Optional(6)
.thenAccept(res -> res.ifPresent(sum ->
System.out.println("Result is: " + sum)));
ReactiveStreams.of(1, 2, 3)
.toList()
.run()
// Produce [1, 2, 3]
.thenAccept(res -> System.out.println("Result is: " + res));
Description: Forward the elements of a stream to a given Subscriber
or SubscriberBuilder
.
Example:
SubscriberBuilder<Integer, Optional<Integer>> subscriber
= ReactiveStreams.<Integer>builder()
.map(i -> i + 1)
.findFirst();
ReactiveStreams.of(1, 2, 3)
.to(subscriber)
.run()
// Produce Optional[2]
.thenAccept(optional ->
optional.ifPresent(i -> System.out.println("Result: " + i)));
3. Execution Model
SmallRye Reactive Stream Operators provides a way to control on which thread are the different callbacks invoked. By default it uses the caller thread.
If you are building a Vert.x application, add the following dependency to your project so enforce the Vert.x execution model:
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-streams-vertx-execution-model</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
With this dependency, if you are calling ReactiveStreams.x
from a Vert.x thread, the same thread is used to call the different callbacks and pass the result.
4. Reactive Type Converters
The reactive type converters are a set of modules not directly related to MicroProfile Reactive Streams Operators. These converters adapts reactive types from different reactive programming libraries. The main interface is:
public interface ReactiveTypeConverter<T> {
<X> CompletionStage<X> toCompletionStage(T instance);
<X> Publisher<X> toRSPublisher(T instance);
<X> T fromCompletionStage(CompletionStage<X> cs);
<X> T fromPublisher(Publisher<X> publisher);
// ...
You can use converters to convert types provided by different reactive programming libraries to Publisher
and
CompletionStage
, and the opposite:
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-api</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-reactive-streams-operators</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-reactor</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-rxjava1</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-reactive-converter-rxjava2</artifactId>
<version>1.0.8-SNAPSHOT</version>
</dependency>
CompletionStage cs = ...
ReactiveTypeConverter<Completable> converter = Registry.lookup(Completable.class)
.orElseThrow(() -> new AssertionError("Completable converter should be found"));
Completable converted = converter.fromCompletionStage(cs);
The conversion rules are detailed in the javadoc.