Project Reactor

https://zbciok.github.io/docs/project-reactor

https://projectreactor.io/docs/core/release/reference/#about-doc
https://eherrera.net/project-reactor-course/

Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFutureStream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements) — and extensively implements the Reactive Streams specification.

Reactor also supports non-blocking inter-process communication with the reactor-netty project. Suited for Microservices Architecture, Reactor Netty offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP. Reactive encoding and decoding are fully supported.

Maven Installation

First, you need to import the BOM by adding the following snippet to your pom.xml:

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2022.0.12</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Next, add your dependencies to the relevant reactor projects:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

Reactive Streams

Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking backpressure. Starting from Java 9, they have become a part of the JDK in the form of the java.util.concurrent.Flow.* interfaces. Reactive Streams Specification is a set of rules or set of guidelines that you need to follow when designing a reactive stream.  These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.

Generalized Streams

In Generalized Streams Pipeline the data flows from the producer, through the processing stages, to the consumer:

If you consider that the components above can have different processing speeds, there are two possible scenarios:

  1. If the downstream (i.e. the component that receives data) is faster than the upstream (the component that sends data), you’re all good, since the pipeline should be working smoothly.
  2. If, however, the upstream is faster, then the downstream becomes flooded with data and things start getting worse.

In the latter case, there is a couple of strategies to deal with the excess data:

  1. Buffer it — but buffers have limited capacity and you’re going to run out of memory sooner or later.
  2. Drop it — but then you lose data (which is usually not desired, but can make sense in certain cases — e.g. this is what networking hardware often does).
  3. Block until the consumer is done with it — but this may result in slowing down the entire pipeline.

The preferred way of dealing with those different processing capabilities is a technique called backpressure — which boils down to the slower consumer requesting a given amount of data from the faster producer — but only an amount the consumer is able to process at that time.

Coming back to the streaming pipeline diagram, you can think of backpressure as a special kind of signalling data flowing in the opposite direction (compared to the regular data that is being processed:

Reactive Streams

Key factors that make a stream reactive:

  • the data is processed asynchronously,
  • the backpressure mechanism is non-blocking,
  • the fact that the downstream can be slower than the upstream is somehow represented in the domain model.

Reactive Streams Specification

These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.

Publisher
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
Subscriber
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Reactive Programming Libraries

A reactive library is nothing but the implementation of reactive specification interfaces. Here are some reactive libraries that are available to us:

  • Project Reactor 
  • RxJava 
  • JDK 9 Flow Reactive Stream

Project Reactor

The Project Reactor is a fourth-generation reactive library that implements Reactive Streams specifications, for building non-blocking applications on the JVM.

Project reactor libraries provide two implementations of the Publisher interface:

  1. Mono
  2. Flux

Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.

Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.

Reactive Stream Workflow

Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.

2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.

3. Next, the Subscriber will call the request(n) method of Subscription to request data from the Publisher.

4. Next, Publisher call onNext(data) method to send data to the SubscriberPublisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.

5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.

Mono and Flux

Mono

The following image shows how a Mono transforms an item:

A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext. Mono.never() is an outlier: it doesn’t emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.

Mono offers only a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.

Note that you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one, you can use an empty Mono<Void>.

Flux

The following image shows how a Flux transforms items:

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

With this large scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove the onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.

Mono and Flux Examples

Mono

When we subscribe to a Publisher(Mono), it starts emitting signals like:

  • onNext 
  • When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface.onError
  • In case of an error, the exception will be sent in the form of an event to the Subscriber.
  • Using the onError() method.onComplete
  • When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.

Nothing will happen with the Publisher if we don’t start consuming it.

When we call one of the overloaded subscribe() methods from the Subscriber interface, we are requesting the Publisher to start emitting data.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>zjc.examples</groupId>
    <artifactId>mono-and-flux-xamples</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2022.0.12</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

Example #1

Create and subscribe to Mono:

package zjc.examples;

import reactor.core.publisher.Mono;

public class Main {
    public static void main(String[] args) {
        // create a Mono
        Mono<String> mono = Mono.just("Hello");

        // subscribe to a Mono
        mono.subscribe();
    }
}

We started consuming the data, but we are not doing anything with it.

Example #2

class ReactiveJavaTutorial {

    public static void main(String[] args) {

        // create a Mono
        Mono<String> mono = Mono.just("Hello");

        // subscribe to a Mono
        mono.subscribe(data -> System.out.println(data));

    }
}

Output: Hello

This was one simple example of how to subscribe to a Mono using the subscribe(Consumer consumer) method that subscribes a Consumer to the Mono.

Example #3

Let’s subscribe and define what should be triggered for each of the 3 events.

    public static void main(String[] args) {

        // create a Mono
        Mono<String> mono = Mono.just("Hello");

        // subscribe to a Mono
        mono.subscribe(
                data -> System.out.println(data), // onNext
                err -> System.out.println(err),  // onError
                () -> System.out.println("Completed!") // onComplete
        );

    }

Output: Hello Completed!

We used here the subscribe(Consumer<? super T> consumer, Consumer <? super Throwable> errorConsumer, Runnable completeConsumer) method that subscribes a Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion.

Example #4

Let’s invoke the onError signal:

    public static void main(String[] args) {

        // create a Mono
        Mono<String> mono = Mono.fromSupplier(() -> {
            throw new RuntimeException("Exception occurred!");
        });

        // subscribe to a Mono
        mono.subscribe(
                data -> System.out.println(data), // onNext
                err -> System.out.println("ERROR: " + err),  // onError
                () -> System.out.println("Completed!") // onComplete
        );

    }

Output: ERROR: java.lang.RuntimeException: Exception occurred!

Example #5

zipWith() Method. The zipWith() method combines the result from this mono and another mono object.

public class FluxAndMonoServices {

    public Mono<String> fruitsMonoZipWith() {
        var fruits = Mono.just("Mango");
        var veggies = Mono.just("Tomato");

        return fruits.zipWith(veggies,
                (first,second) -> first+second).log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsMonoZipWith()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

Mono -> s = MangoTomato

Flux and Mono

Example #6 flatMap() and flatMapMany() Methods

flatMap() – Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

flatMapMany() – Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned

public class FluxAndMonoService {

    public Mono<List<String>> fruitMonoFlatMap() {
        return Mono.just("Mango")
                .flatMap(s -> Mono.just(List.of(s.split(""))))
                .log();
    }

    public Flux<String> fruitMonoFlatMapMany() {
        return Mono.just("Mango")
                .flatMapMany(s -> Flux.just(s.split("")))
                .log();
    }

    public static void main(String[] args) {

        FluxAndMonoService fluxAndMonoServices
                = new FluxAndMonoService();

        fluxAndMonoServices.fruitMonoFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitMonoFlatMapMany()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(M)
Mono -> s = M
[ INFO] (main) onNext(a)
Mono -> s = a
[ INFO] (main) onNext(n)
Mono -> s = n
[ INFO] (main) onNext(g)
Mono -> s = g
[ INFO] (main) onNext(o)
Mono -> s = o
[ INFO] (main) onComplete()

Flux

Example #7

The following code shows an example of the basic method with no arguments:

public class FluxAndMonoServices {
    public static void main(String[] args) {
        // Set up a Flux that produces three values when a subscriber attaches.
        Flux<Integer> ints = Flux.range(1, 3);

        // Subscribe in the simplest way.
        ints.subscribe();
    }
}

Example #8

The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:

public class FluxAndMonoServices {
    public static void main(String[] args) {
        // Set up a Flux that produces three values when a subscriber attaches.
        Flux<Integer> ints = Flux.range(1, 3);

        // Subscribe with a subscriber that will print the values.
        ints.subscribe(i -> System.out.println(i));
    }
}

Output:

1
2
3

Example #9

To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:

public class FluxAndMonoServices {
    public static void main(String[] args) {
        Flux<Integer> ints = Flux.range(1, 4)
                // We need a map so that we can handle some values differently.
                .map(i -> {
                    // For most values, return the value.
                    if (i <= 3) return i;
                    // For one value, force an error.
                    throw new RuntimeException("Got to 4");
                });
        // Subscribe with a subscriber that includes an error handler.
        ints.subscribe(i -> System.out.println(i),
                error -> System.err.println("Error: " + error));
    }
}

Output

1
2
3
Error: java.lang.RuntimeException: Got to 4

Example #10

The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:

public class FluxAndMonoServices {
    public static void main(String[] args) {
        Flux<Integer> ints = Flux.range(1, 4);
        
        // Subscribe with a Subscriber that includes a handler for completion events.
        ints.subscribe(i -> System.out.println(i),
                error -> System.err.println("Error " + error),
                () -> System.out.println("Done"));
    }
}

Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.

The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output:

1
2
3
4
Done

Example #11

map() Method Example. The filter() method evaluates each source value against the given Predicate.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxFilter(int number) {
        return Flux.fromIterable(List.of("zjc","examples","Flux"))
                .filter(s -> s.length() > number);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxFilter(2)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output

s = zjc
s = examples
s = Flux

Example #12

flatMap() and delayElements() Methods Example.

public class FluxAndMonoServices {

    public Flux<String> stringsFluxFlatMap() {
        return Flux.fromIterable(List.of("zjc","examples","Flux"))
                .flatMap(s -> Flux.just(s.split("")))
                .log();
    }

    public Flux<String> stringsFluxFlatMapAsync() {
        return Flux.fromIterable(List.of("zjc","examples","Flux"))
                .flatMap(s -> Flux.just(s.split(""))
                        .delayElements(Duration.ofMillis(
                                new Random().nextInt(1000)
                        )))
                .log();
    }


    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.stringsFluxFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.stringsFluxFlatMapAsync()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output

[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(z)
s = z
[ INFO] (main) onNext(j)
s = j
[ INFO] (main) onNext(c)
s = c
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onNext(a)
s = a
[ INFO] (main) onNext(m)
s = m
[ INFO] (main) onNext(p)
s = p
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(s)
s = s
[ INFO] (main) onNext(F)
s = F
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(u)
s = u
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)

Example #13

transform(), defaultIfEmpty() and switchIfEmpty() Methods

transform() – Transform this Flux in order to generate a target Flux.

defaultIfEmpty() – Provide a default unique value if this sequence is completed without any data.

switchIfEmpty() – Switch to an alternative Publisher if this sequence is completed without any data.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxTransform(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
                .transform(filterData)
                .log();
        //.filter(s -> s.length() > number);
    }

    public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
                .transform(filterData)
                .defaultIfEmpty("Default")
                .log();

    }

    public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
                .transform(filterData)
                .switchIfEmpty(Flux.just("123456789","1234 12345")
                        .transform(filterData))
                .log();

    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxTransform(5)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output

[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(ex1234)
s = ex1234
[ INFO] (main) | onNext(Flux12)
s = Flux12
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(Default)
s = Default
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(123456789)
s = 123456789
[ INFO] (main) onNext(1234 12345)
s = 1234 12345
[ INFO] (main) onComplete()

Example #14

concat() and concatWith() Methods

concat() – Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.

concatWith() – Concatenate emissions of this Flux with the provided Publisher (no interleave).

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxConcat() {
        var chars = Flux.just("abcde","fghijk");
        var nrs = Flux.just("123456","78901");

        return Flux.concat(chars,nrs);
    }

    public Flux<String> fruitsFluxConcatWith() {
        var chars = Flux.just("abcde","fghijk");
        var nrs = Flux.just("123456","78901");

        return chars.concatWith(nrs);
    }


    public Flux<String> fruitsMonoConcatWith() {
        var chars = Mono.just("abcde");
        var nrs = Mono.just("123456");

        return chars.concatWith(nrs);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxConcat()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsMonoConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output

s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = 123456

Example #15

merge() and mergeWith() Methods

merge() – Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.

mergeWith() – Merge data from this Flux and a Publisher into an interleaved merged sequence.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxMerge() {
        var chars = Flux.just("abcde","fghijk");
        var nrs = Flux.just("123456","78901");
        return Flux.merge(chars, nrs);
    }

    public Flux<String> fruitsFluxMergeWith() {
        var chars = Flux.just("abcde","fghijk");
        var nrs = Flux.just("123456","78901");
        return chars.mergeWith(nrs);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxMerge()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxMergeWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output

s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901

Code: https://github.com/ZbCiok/reactor-mono-and-flux-examples