ReactiveX / RxJava
Example#1
package org.example;
import io.reactivex.rxjava3.core.Flowable;
public class Main {
public static void main(String[] args) {
Flowable.just("Hello World!").subscribe(System.out::println);
}
}Output
Hello World!Example#2
package org.example;
import io.reactivex.rxjava3.core.Observable;
public class Main {
public static void main(String[] args) {
// Build an observable that emits an integer sequence.
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
// Subscribe to the observable and print each emitted value
observable.subscribe(value -> System.out.println("Received: " + value));
}
}In this example, we create an observable sequence of integers using RxJava. Subsequently, we subscribe to this observable, and as it emits values, the subscribed observer reacts by printing the received values.
Output
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ReactiveJavaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.8</version>
</dependency>
</dependencies>
</project>RxJava: Basic Guide: https://jreact.com/index.php/2023/11/05/rxjava-basic-guide/
RxJava
RxJava is a Java based extension of ReactiveX. It provides implementation of ReactiveX project in Java. Following are the key characteristics of RxJava.
- Extends the observer pattern.
- Support sequences of data/events.
- Provides operators to compose sequences together declaratively.
- Handles threading, synchronization, thread-safety and concurrent data structures internally.
How Observable works
Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.
Observable
- Observable provides data once subscriber starts listening.
- Observable can emit any number of items.
- Observable can emit only signal of completion as well with no item.
- Observable can terminate successfully.
- Observable may never terminate. e.g. a button can be clicked any number of times.
- Observable may throw error at any point of time.
Observer (Subscriber)
- Observable can have multiple observers (subscribers).
- When an Observable emits an item, each subscriber onNext() method gets invoked.
- When an Observable finished emitting items, each subscriber onComplete() method gets invoked.
- If an Observable emits error, each subscriber onError() method gets invoked.
Classes to Create Observables.
- Flowable − 0..N flows, Emits 0 or n items. Supports Reactive-Streams and back-pressure.
- Observable − 0..N flows ,but no back-pressure.
- Single − 1 item or error. Can be treated as a reactive version of method call.
- Completable − No item emitted. Used as a signal for completion or error. Can be treated as a reactive version of Runnable.
- MayBe − Either No item or 1 item emitted. Can be treated as a reactive version of Optional.
Methods to Create Observables
In Observable Class:
- just(T item) − Returns an Observable that signals the given (constant reference) item and then completes.
- fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
- fromArray(T… items) − Converts an Array into an ObservableSource that emits the items in the Array.
- fromCallable(Callable supplier) − Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
- fromFuture(Future future) − Converts a Future into an ObservableSource.
- interval(long initialDelay, long period, TimeUnit unit) − Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.
Creating Observables
This list shows methods that create reactive sources, such as Observables.
See: https://github.com/ReactiveX/RxJava/wiki/Creating-Observables
targetType.from{sourceType}()
from{sourceType}() wraps or converts another reactive type to the target reactive type. The following combinations are available in the various reactive types with the following signature pattern: targetType.from{sourceType}(). Not all possible conversion is implemented via the from{reactive type} method families. Check out the to{reactive type} method families for further conversion possibilities.

Example: (see also: http://jreact.com/index.php/2023/10/27/rxjava-examples/ )
Mono<Integer> reactorMono = Mono.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1));
Observable<Integer> observable = Observable.fromPublisher(reactorMono);
observable.subscribe(
item -> System.out.println(item),
error -> error.printStackTrace(),
() -> System.out.println("Done"));Observable
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html
public abstract class Observable<T> extends Object implements ObservableSource<T>
The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.
Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.
The Observable‘s operators, by default, run with a buffer size of 128 elements (see Flowable.bufferSize()), that can be overridden globally via the system parameter rx3.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

Examples – see: http://jreact.com/index.php/2023/10/27/rxjava-examples/
Example#1
Observable.just
Observable<String> observable = Observable.just("Hello, world");
observable.subscribe(System.out::println);Example#2
Observable.fromIterable
List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
Observable<Integer> observable = Observable.fromIterable(list);
observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
() -> System.out.println("Done"));Flowable
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html
public abstract class Flowable<T> extends Object implements Publisher<T>
The Flowable class that implements the Reactive Streams Publisher Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
Reactive Streams operates with Publishers which Flowable extends. Many operators therefore accept general Publishers directly and allow direct interoperation with other Reactive Streams implementations.
The Flowable hosts the default buffer size of 128 elements for operators, accessible via bufferSize(), that can be overridden globally via the system parameter rx3.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

A Flowable is similar to an Observable in that it represents a stream of data or events that can be observed and processed by Observers. However, Flowable adds the ability to handle backpressure, which is crucial when dealing with large amounts of data or slow consumers. Backpressure refers to the mechanism of controlling the flow of data so that the consumer can process it at its own pace, avoiding overwhelming or dropping data.
Flowables support backpressure and require Subscribers to signal demand via Subscription.request(long).
Examples – Creating a Flowable
Flowables can be created in a similar way to Observables. Here are a few common methods:
Example#1 – Create
Flowable.create() method allows you to manually create a Flowable. Within the create() method, you define the logic for emitting items to the Observers. For example:
Flowable<String> flowable = Flowable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onComplete();
}, BackpressureStrategy.BUFFER);Example#2 – Just
Flowable.just()
Flowable<Integer> flowable = Flowable.just(1, 2, 3, 4, 5);Example#3 – FromIterable
Flowable.fromIterable()
Example#4 – Subscribing to a Flowable
Flowable<String> flowable = Flowable.fromIterable(Arrays.asList("one", null, "two")).skipLast(1);
flowable.subscribe(new Subscriber<String>() {
@Override
public void onNext(String item) {
// Handle the emitted item
System.out.println(item);
}
@Override
public void onError(Throwable error) {
// Handle errors
error.printStackTrace();
}
@Override
public void onComplete() {
// Handle completion
System.out.println("Flowable completed");
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE); // Request all items
}
});Maybe
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html
Maybe is a special kind of Observable which can only emit zero or one item, and report an error if the computation fails at some point. We can work with Maybe like a Flowable as long as the operation makes sense for 0 or 1 items.
Disposable d = Maybe.just("Hello World")
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
d.dispose();Single
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html
Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable).
With this source of data, we can only use two methods to subscribe:
- OnSuccess returns a Single that also calls a method we specify
- OnError also returns a Single that immediately notifies subscribers of an error
Example#1
Disposable d = Single.just("Hello World")
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableSingleObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
Thread.sleep(5000);
d.dispose();Example#2
@Test
public void just02() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Single.just("Welcome")
.map(new Function<String, String>() {
@Override
public String apply(String s) {
return s + "B";
}
})
.toFlowable().subscribe(ts);
ts.assertValueSequence(Arrays.asList("WelcomeB"));
}Completable
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html
The Completable class represents a deferred computation without any value but only indication for completion or exception.
Completable behaves similarly to Observable except that it can only emit either a completion or error signal (there is no onNext or onSuccess as with the other reactive types).
Example#1
Disposable d = Completable.complete()
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(5000);
d.dispose(); Example#2
Completable
.complete()
.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});More Examples: https://www.programcreek.com/java-api-examples/?api=io.reactivex.Completable
