https://zbciok.github.io/docs/project-reactor
https://projectreactor.io/docs/core/release/reference/#about-doc
https://eherrera.net/project-reactor-course/
Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”). It integrates directly with the Java 8 functional APIs, notably CompletableFuture, Stream, and Duration. It offers composable asynchronous sequence APIs — Flux (for [N] elements) and Mono (for [0|1] elements) — and extensively implements the Reactive Streams specification.
Reactor also supports non-blocking inter-process communication with the reactor-netty project. Suited for Microservices Architecture, Reactor Netty offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP. Reactive encoding and decoding are fully supported.
Maven Installation
First, you need to import the BOM by adding the following snippet to your pom.xml:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2022.0.12</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>Next, add your dependencies to the relevant reactor projects:
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>Reactive Streams
Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking backpressure. Starting from Java 9, they have become a part of the JDK in the form of the java.util.concurrent.Flow.* interfaces. Reactive Streams Specification is a set of rules or set of guidelines that you need to follow when designing a reactive stream. These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.
Generalized Streams
In Generalized Streams Pipeline the data flows from the producer, through the processing stages, to the consumer:

If you consider that the components above can have different processing speeds, there are two possible scenarios:
- If the downstream (i.e. the component that receives data) is faster than the upstream (the component that sends data), you’re all good, since the pipeline should be working smoothly.
- If, however, the upstream is faster, then the downstream becomes flooded with data and things start getting worse.
In the latter case, there is a couple of strategies to deal with the excess data:
- Buffer it — but buffers have limited capacity and you’re going to run out of memory sooner or later.
- Drop it — but then you lose data (which is usually not desired, but can make sense in certain cases — e.g. this is what networking hardware often does).
- Block until the consumer is done with it — but this may result in slowing down the entire pipeline.
The preferred way of dealing with those different processing capabilities is a technique called backpressure — which boils down to the slower consumer requesting a given amount of data from the faster producer — but only an amount the consumer is able to process at that time.
Coming back to the streaming pipeline diagram, you can think of backpressure as a special kind of signalling data flowing in the opposite direction (compared to the regular data that is being processed:
Reactive Streams

Key factors that make a stream reactive:
- the data is processed asynchronously,
- the backpressure mechanism is non-blocking,
- the fact that the downstream can be slower than the upstream is somehow represented in the domain model.
Reactive Streams Specification
These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}Reactive Programming Libraries
A reactive library is nothing but the implementation of reactive specification interfaces. Here are some reactive libraries that are available to us:
- Project Reactor
- RxJava
- JDK 9 Flow Reactive Stream
Project Reactor
The Project Reactor is a fourth-generation reactive library that implements Reactive Streams specifications, for building non-blocking applications on the JVM.
Project reactor libraries provide two implementations of the Publisher interface:
- Mono
- Flux
Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.
Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.
Reactive Stream Workflow

Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.
2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.
3. Next, the Subscriber will call the request(n) method of Subscription to request data from the Publisher.
4. Next, Publisher call onNext(data) method to send data to the Subscriber. Publisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.
5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
Mono and Flux
Mono
The following image shows how a Mono transforms an item:

A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).
Most Mono implementations are expected to immediately call onComplete on their Subscriber after having called onNext. Mono.never() is an outlier: it doesn’t emit any signal, which is not technically forbidden although not terribly useful outside of tests. On the other hand, a combination of onNext and onError is explicitly forbidden.
Mono offers only a subset of the operators that are available for a Flux, and some operators (notably those that combine the Mono with another Publisher) switch to a Flux. For example, Mono#concatWith(Publisher) returns a Flux while Mono#then(Mono) returns another Mono.
Note that you can use a Mono to represent no-value asynchronous processes that only have the concept of completion (similar to a Runnable). To create one, you can use an empty Mono<Void>.
Flux
The following image shows how a Flux transforms items:

A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.
With this large scope of possible signals, Flux is the general-purpose reactive type. Note that all events, even terminating ones, are optional: no onNext event but an onComplete event represents an empty finite sequence, but remove the onComplete and you have an infinite empty sequence (not particularly useful, except for tests around cancellation). Similarly, infinite sequences are not necessarily empty. For example, Flux.interval(Duration) produces a Flux<Long> that is infinite and emits regular ticks from a clock.
Mono and Flux Examples
Mono
When we subscribe to a Publisher(Mono), it starts emitting signals like:
- onNext
- When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface.onError
- In case of an error, the exception will be sent in the form of an event to the Subscriber.
- Using the onError() method.onComplete
- When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.
Nothing will happen with the Publisher if we don’t start consuming it.
When we call one of the overloaded subscribe() methods from the Subscriber interface, we are requesting the Publisher to start emitting data.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>zjc.examples</groupId>
<artifactId>mono-and-flux-xamples</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2022.0.12</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Example #1
Create and subscribe to Mono:
package zjc.examples;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe();
}
}We started consuming the data, but we are not doing anything with it.
Example #2
class ReactiveJavaTutorial {
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe(data -> System.out.println(data));
}
}Output: Hello
This was one simple example of how to subscribe to a Mono using the subscribe(Consumer consumer) method that subscribes a Consumer to the Mono.
Example #3
Let’s subscribe and define what should be triggered for each of the 3 events.
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.just("Hello");
// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println(err), // onError
() -> System.out.println("Completed!") // onComplete
);
}Output: Hello Completed!
We used here the subscribe(Consumer<? super T> consumer, Consumer <? super Throwable> errorConsumer, Runnable completeConsumer) method that subscribes a Consumer to this Mono that will respectively consume all the elements in the sequence, handle errors and react to completion.
Example #4
Let’s invoke the onError signal:
public static void main(String[] args) {
// create a Mono
Mono<String> mono = Mono.fromSupplier(() -> {
throw new RuntimeException("Exception occurred!");
});
// subscribe to a Mono
mono.subscribe(
data -> System.out.println(data), // onNext
err -> System.out.println("ERROR: " + err), // onError
() -> System.out.println("Completed!") // onComplete
);
}Output: ERROR: java.lang.RuntimeException: Exception occurred!
Example #5
zipWith() Method. The zipWith() method combines the result from this mono and another mono object.
public class FluxAndMonoServices {
public Mono<String> fruitsMonoZipWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsMonoZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}Output:
Mono -> s = MangoTomatoFlux and Mono
Example #6 flatMap() and flatMapMany() Methods
flatMap() – Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).
flatMapMany() – Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned
public class FluxAndMonoService {
public Mono<List<String>> fruitMonoFlatMap() {
return Mono.just("Mango")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}
public Flux<String> fruitMonoFlatMapMany() {
return Mono.just("Mango")
.flatMapMany(s -> Flux.just(s.split("")))
.log();
}
public static void main(String[] args) {
FluxAndMonoService fluxAndMonoServices
= new FluxAndMonoService();
fluxAndMonoServices.fruitMonoFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitMonoFlatMapMany()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}Output:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(M)
Mono -> s = M
[ INFO] (main) onNext(a)
Mono -> s = a
[ INFO] (main) onNext(n)
Mono -> s = n
[ INFO] (main) onNext(g)
Mono -> s = g
[ INFO] (main) onNext(o)
Mono -> s = o
[ INFO] (main) onComplete()Flux
Example #7
The following code shows an example of the basic method with no arguments:
public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);
// Subscribe in the simplest way.
ints.subscribe();
}
}Example #8
The preceding code produces no visible output, but it does work. The Flux produces three values. If we provide a lambda, we can make the values visible. The next example for the subscribe method shows one way to make the values appear:
public class FluxAndMonoServices {
public static void main(String[] args) {
// Set up a Flux that produces three values when a subscriber attaches.
Flux<Integer> ints = Flux.range(1, 3);
// Subscribe with a subscriber that will print the values.
ints.subscribe(i -> System.out.println(i));
}
}Output:
1
2
3Example #9
To demonstrate the next signature, we intentionally introduce an error, as shown in the following example:
public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4)
// We need a map so that we can handle some values differently.
.map(i -> {
// For most values, return the value.
if (i <= 3) return i;
// For one value, force an error.
throw new RuntimeException("Got to 4");
});
// Subscribe with a subscriber that includes an error handler.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error: " + error));
}
}Output
1
2
3
Error: java.lang.RuntimeException: Got to 4Example #10
The next signature of the subscribe method includes both an error handler and a handler for completion events, as shown in the following example:
public class FluxAndMonoServices {
public static void main(String[] args) {
Flux<Integer> ints = Flux.range(1, 4);
// Subscribe with a Subscriber that includes a handler for completion events.
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"));
}
}Error signals and completion signals are both terminal events and are exclusive of one another (you never get both). To make the completion consumer work, we must take care not to trigger an error.
The completion callback has no input, as represented by an empty pair of parentheses: It matches the run method in the Runnable interface. The preceding code produces the following output:
1
2
3
4
DoneExample #11
map() Method Example. The filter() method evaluates each source value against the given Predicate.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxFilter(int number) {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.filter(s -> s.length() > number);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices = new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFilter(2)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}Output
s = zjc
s = examples
s = FluxExample #12
flatMap() and delayElements() Methods Example.
public class FluxAndMonoServices {
public Flux<String> stringsFluxFlatMap() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}
public Flux<String> stringsFluxFlatMapAsync() {
return Flux.fromIterable(List.of("zjc","examples","Flux"))
.flatMap(s -> Flux.just(s.split(""))
.delayElements(Duration.ofMillis(
new Random().nextInt(1000)
)))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.stringsFluxFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.stringsFluxFlatMapAsync()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}Output
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(z)
s = z
[ INFO] (main) onNext(j)
s = j
[ INFO] (main) onNext(c)
s = c
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onNext(a)
s = a
[ INFO] (main) onNext(m)
s = m
[ INFO] (main) onNext(p)
s = p
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(e)
s = e
[ INFO] (main) onNext(s)
s = s
[ INFO] (main) onNext(F)
s = F
[ INFO] (main) onNext(l)
s = l
[ INFO] (main) onNext(u)
s = u
[ INFO] (main) onNext(x)
s = x
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)Example #13
transform(), defaultIfEmpty() and switchIfEmpty() Methods
transform() – Transform this Flux in order to generate a target Flux.
defaultIfEmpty() – Provide a default unique value if this sequence is completed without any data.
switchIfEmpty() – Switch to an alternative Publisher if this sequence is completed without any data.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxTransform(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.log();
//.filter(s -> s.length() > number);
}
public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.defaultIfEmpty("Default")
.log();
}
public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("zjc12","ex1234","Flux12"))
.transform(filterData)
.switchIfEmpty(Flux.just("123456789","1234 12345")
.transform(filterData))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxTransform(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}Output
[ INFO] (main) | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(ex1234)
s = ex1234
[ INFO] (main) | onNext(Flux12)
s = Flux12
[ INFO] (main) | onComplete()
[ INFO] (main) onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(Default)
s = Default
[ INFO] (main) onComplete()
[ INFO] (main) onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(123456789)
s = 123456789
[ INFO] (main) onNext(1234 12345)
s = 1234 12345
[ INFO] (main) onComplete()Example #14
concat() and concatWith() Methods
concat() – Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.
concatWith() – Concatenate emissions of this Flux with the provided Publisher (no interleave).
public class FluxAndMonoServices {
public Flux<String> fruitsFluxConcat() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return Flux.concat(chars,nrs);
}
public Flux<String> fruitsFluxConcatWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return chars.concatWith(nrs);
}
public Flux<String> fruitsMonoConcatWith() {
var chars = Mono.just("abcde");
var nrs = Mono.just("123456");
return chars.concatWith(nrs);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxConcat()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsMonoConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}Output
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = 123456Example #15
merge() and mergeWith() Methods
merge() – Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.
mergeWith() – Merge data from this Flux and a Publisher into an interleaved merged sequence.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxMerge() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return Flux.merge(chars, nrs);
}
public Flux<String> fruitsFluxMergeWith() {
var chars = Flux.just("abcde","fghijk");
var nrs = Flux.just("123456","78901");
return chars.mergeWith(nrs);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMerge()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxMergeWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output
s = abcde
s = fghijk
s = 123456
s = 78901
s = abcde
s = fghijk
s = 123456
s = 78901