添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
  • 1.1. Quickstart
  • 1.2. Using Reactive Streams Operators in a Vert.x application
  • 1.3. Using Reactive Streams Operators in a Camel application
  • 2. Operators
  • 2.1. Creating streams
  • Creating empty streams
  • Creating streams from elements
  • Creating failing streams
  • Creating streams from CompletionStage
  • Creating streams from collections
  • Wrapping a Reactive Stream Publisher
  • Generating infinite streams
  • 2.2. Processing streams
  • Creating a processor
  • Filtering elements
  • Composing asynchronous actions
  • Transforming items
  • Combining a Processor
  • 2.3. Action operators
  • 2.4. Error management operators
  • Terminal operator and computing asynchronous result
  • Cancelling a stream
  • Ignoring elements
  • Collecting results
  • Get the first item of a stream
  • Execute a method for each element
  • Pass to a Reactive Streams Subscriber
  • IMPORTANT: Project in maintenance mode

    This repository is in maintenance mode. No new features will be implemented.

    Another implementation of MicroProfile Reactive Streams Operators is available in Mutiny . It is strongly recommended to switch to this implementation.

    Reactive Converters have been migrated to https://github.com/smallrye/smallrye-reactive-utils .

    If you have any questions, send a message to https://groups.google.com/forum/#!forum/smallrye .

    The idea behind the specification is to provide the equivalent of java.util.stream however, for Reactive Stream, so, inherently asynchronous, supporting back-pressure and with error and completion signals propagation. The following code snippet shows how close the API is:

    // java.util.stream version:
    Stream.of("hello", "world")
            .filter(word -> word.length() <= 5)
            .map(String::toUpperCase)
            .findFirst()
            .ifPresent(s -> System.out.println("Regular Java stream result: " + s));
    // reactive stream operator version:
    ReactiveStreams.of("hello", "world")
            .filter(word -> word.length() <= 5)
            .map(String::toUpperCase)
            .findFirst()
            .run() // Run the stream (start publishing)
            // Retrieve the result asynchronously, using a CompletionStage
            .thenAccept(res -> res
                    .ifPresent(s -> System.out.println("Reactive Stream result: " + s)));

    The SmallRye implementation is based on RX Java 2 .

    1.1. Quickstart

    The easiest to start using SmallRye Reactive Stream Operators is to start it directly in a main class. You only need to put smallrye-reactive-streams-operators in your CLASSPATH to use it.

    Creates a Maven project, and include the following dependency in your pom.xml:

    <dependency>
        <groupId>io.smallrye.reactive</groupId>
        <artifactId>smallrye-reactive-streams-operators</artifactId>
        <version>1.0.8-SNAPSHOT</version>
    </dependency>

    Once created, create a class file with a public static void main(String…​ args) method:

    package io.smallrye.reactive.operators.quickstart;
    import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
    public class QuickStart {
        public static void main(String[] args) {
            // Create a stream of words
            ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators")
                    .map(String::toUpperCase) // Transform the words
                    .filter(s -> s.length() > 4) // Filter items
                    .forEach(word -> System.out.println(">> " + word)) // Terminal operation
                    .run(); // Run it (create the streams, subscribe to it...)
                        

    Once everything is set up, you should be able to run the application using:

    mvn compile exec:java -Dexec.mainClass=io.smallrye.reactive.operators.quickstart.QuickStart

    Running the previous example should give the following output:

    >> HELLO
    >> SMALLRYE
    >> REACTIVE
    >> STREAM
    >> OPERATORS

    The Reactive Streams Operator is intended to be used in other software and not as a standalone api. However, to give you a better overview the 2 following quickstart explains how to use it in Eclipse Vert.x applications and Apache Camel applications. import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; public class DataProcessor extends AbstractVerticle { private static final int PORT = 8080; @Override public void start(Future<Void> done) { vertx.createHttpServer() .requestHandler(request -> { // Consume messages from the Vert.x event bus MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("data"); // Wrap the stream and manipulate the data ReactiveStreams.fromPublisher(consumer.toFlowable()) .limit(5) // Take only 5 messages .map(Message::body) // Extract the body .map(json -> json.getInteger("value")) // Extract the value .peek(i -> System.out.println("Got value: " + i)) // Print it .reduce(0, (acc, value) -> acc + value) .run() // Begin to receive items .whenComplete((res, err) -> { // When the 5 items has been consumed, write the result to the // HTTP response: if (err != null) { request.response().setStatusCode(500).end(err.getMessage()); } else { request.response().end("Result is: " + res); .listen(PORT, ar -> done.handle(ar.mapEmpty()));

    This example creates an HTTP server and for each request to collect 5 messages sent by another component on the Vert.x event bus. It computes the sum of these 5 elements and writes the result to the HTTP response. It’s important to notice that the messages coming from the event bus are sent asynchronously. So, it would not be possible to write the previous code using java.util.streams.

    When used in a Vert.x application, Reactive Stream Operators can be used to processed data and compute an asynchronous result.

    1.3. Using Reactive Streams Operators in a Camel application

    Apache Camel is a toolkit to define routing and mediation rules, mainly used to integrate systems, using enterprise integration patterns. Apache Camel provides more than 200+ components so that it can integrate virtually with anything.

    You can combine Reactive Stream Operators and Apache Camel thanks to the Camel Reactive Stream Component.

    package io.smallrye.reactive.operators.quickstart;
    import org.apache.camel.CamelContext;
    import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
    import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
    import org.apache.camel.impl.DefaultCamelContext;
    import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
    import org.reactivestreams.Subscriber;
    import java.io.File;
    import java.nio.file.Files;
    public class QuickStart {
        public static void main(String[] args) throws Exception {
            CamelContext context = new DefaultCamelContext();
            CamelReactiveStreamsService camel = CamelReactiveStreams.get(context);
            Subscriber<String> subscriber = camel
                    .subscriber("file:./target?fileName=values.txt&fileExist=append", String.class);
            ReactiveStreams.of("hello", "from", "smallrye", "reactive", "stream", "operators",
                    "using", "Apache", "Camel")
                    .map(String::toUpperCase) // Transform the words
                    .filter(s -> s.length() > 4) // Filter items
                    .peek(System.out::println)
                    .map(s -> s + " ")
                    .to(subscriber)
                    .run();
            context.start();
            // Just wait until it's done.
            Thread.sleep(1000);
            Files.readAllLines(new File("target/values.txt").toPath())
                    .forEach(s -> System.out.println("File >> " + s));
                        

    You can also use Camel to create Reactive Streams Publisher and transform the items using Reactive Streams Operators.

    2. Operators

    As mentioned before, the Reactive Streams API is an asynchronous version of java.util.stream for Reactive Streams. This section list the operators that are provided.

    The Reactive Streams Operators introduce a set of types to allow creating Reactive Streams:

    Besides, the Reactive Streams Operators introduce CompletionRunner that triggers the emission of the items and provides a way to retrieve the asynchronously computed result.

    2.1. Creating streams

    The first part of the API allows to create PublisherBuilder. A Reactive Streams Publisher can be created from the builder using the .buildRS method.

    2.1.1. Creating empty streams

    Operator: empty

    Description: Creates an empty stream.

            PublisherBuilder<T> streamOfOne = ReactiveStreams.of(t1);
            PublisherBuilder<T> streamOfThree = ReactiveStreams.of(t1, t2, t3);
            PublisherBuilder<T> streamOfOneOrEmpty = ReactiveStreams.ofNullable(maybeNull);
            PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStage(cs);
            // If the redeemed value is `null`, an error is propagated in the stream.
            PublisherBuilder<T> streamOfOne = ReactiveStreams.fromCompletionStageNullable(cs);
            // If the redeemed value is `null`, the stream is completed immediately.
            AtomicInteger counter = new AtomicInteger();
            PublisherBuilder<Integer> stream = ReactiveStreams
                    .generate(() -> counter.getAndIncrement());
            // The resulting stream is an infinite stream.
            PublisherBuilder<Integer> stream = ReactiveStreams
                    .iterate(0, last -> last + 1);
            // The resulting stream is an infinite stream.

    distinct - remove similar element (Attention: do not use on large or unbounded streams)

    dropWhile - drop elements until the predicate returns true

    skip - ignore x elements

    takeWhile - forward elements until the predicate returns true

    limit - pick x elements

    flatMapCompletionStage - Produces a CompletionStage. When completed, the result is passed to the returned stream.

    flatMapIterable - Produces an Iterable and flatten the element into the returned stream. This flatMap method is not asynchronous.

    flatMapRSPublisher - Like flatMap but return a Reactive Streams Publisher

            ProcessorBuilder<Integer, String> processor = ReactiveStreams
                    .<Integer>builder().map(i -> Integer.toString(i));
            ReactiveStreams.of(1, 2)
                    .via(processor); // ("1", "2")

    2.4. Error management operators

    These operators allow recovering after a failure. Because you handle asynchronous streams of data, you can’t use try/catch, so these operators provide a similar feature.

    2.4.1. Terminal operator and computing asynchronous result

    These operators act as subscribers and produce a result. As the result is computed asynchronously, you retrieve a CompletionStage object.

    2.4.2. Cancelling a stream

    Operator: cancel

    Description: Cancel the subscription to a stream. No more items will be received.

    ReactiveStreams.of(1, 2, 3) .collect(() -> new AtomicInteger(1), AtomicInteger::addAndGet) .run() // Produce 7 .thenAccept(res -> System.out.println("Result is: " + res)); ReactiveStreams.of(1, 2, 3) .reduce((acc, item) -> acc + item) .run() // Produce Optional(6) .thenAccept(res -> res.ifPresent(sum -> System.out.println("Result is: " + sum))); ReactiveStreams.of(1, 2, 3) .toList() .run() // Produce [1, 2, 3] .thenAccept(res -> System.out.println("Result is: " + res));

    Description: Forward the elements of a stream to a given Subscriber or SubscriberBuilder.

    Example:

            SubscriberBuilder<Integer, Optional<Integer>> subscriber
                    = ReactiveStreams.<Integer>builder()
                        .map(i -> i + 1)
                        .findFirst();
            ReactiveStreams.of(1, 2, 3)
                    .to(subscriber)
                    .run()
                    // Produce Optional[2]
                    .thenAccept(optional ->
                            optional.ifPresent(i -> System.out.println("Result: " + i)));

    3. Execution Model

    SmallRye Reactive Stream Operators provides a way to control on which thread are the different callbacks invoked. By default it uses the caller thread.

    If you are building a Vert.x application, add the following dependency to your project so enforce the Vert.x execution model:

    <dependency>
      <groupId>io.smallrye</groupId>
        <artifactId>smallrye-reactive-streams-vertx-execution-model</artifactId>
      <version>1.0.8-SNAPSHOT</version>
    </dependency>

    With this dependency, if you are calling ReactiveStreams.x from a Vert.x thread, the same thread is used to call the different callbacks and pass the result.

    4. Reactive Type Converters

    The reactive type converters are a set of modules not directly related to MicroProfile Reactive Streams Operators. These converters adapts reactive types from different reactive programming libraries. The main interface is:

    public interface ReactiveTypeConverter<T> {
        <X> CompletionStage<X> toCompletionStage(T instance);
        <X> Publisher<X> toRSPublisher(T instance);
        <X> T fromCompletionStage(CompletionStage<X> cs);
        <X> T fromPublisher(Publisher<X> publisher);
        // ...

    You can use converters to convert types provided by different reactive programming libraries to Publisher and CompletionStage, and the opposite:

    <dependency>
      <groupId>io.smallrye</groupId>
      <artifactId>smallrye-reactive-converter-api</artifactId>
      <version>1.0.8-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>io.smallrye</groupId>
        <artifactId>smallrye-reactive-converter-reactive-streams-operators</artifactId>
        <version>1.0.8-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>io.smallrye</groupId>
        <artifactId>smallrye-reactive-converter-reactor</artifactId>
        <version>1.0.8-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>io.smallrye</groupId>
        <artifactId>smallrye-reactive-converter-rxjava1</artifactId>
        <version>1.0.8-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>io.smallrye</groupId>
        <artifactId>smallrye-reactive-converter-rxjava2</artifactId>
        <version>1.0.8-SNAPSHOT</version>
    </dependency>
    CompletionStage cs = ...
    ReactiveTypeConverter<Completable> converter = Registry.lookup(Completable.class)
        .orElseThrow(() -> new AssertionError("Completable converter should be found"));
    Completable converted = converter.fromCompletionStage(cs);

    The conversion rules are detailed in the javadoc.