ReactiveX / RxJava

ReactiveX / RxJava

Example#1

package org.example;

import io.reactivex.rxjava3.core.Flowable;

public class Main {
    public static void main(String[] args) {
        Flowable.just("Hello World!").subscribe(System.out::println);
    }
}

Output

Hello World!

Example#2

package org.example;

import io.reactivex.rxjava3.core.Observable;

public class Main {
    public static void main(String[] args) {
        // Build an observable that emits an integer sequence.
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);

        // Subscribe to the observable and print each emitted value
        observable.subscribe(value -> System.out.println("Received: " + value));
    }
}

In this example, we create an observable sequence of integers using RxJava. Subsequently, we subscribe to this observable, and as it emits values, the subscribed observer reacts by printing the received values.

Output

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ReactiveJavaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>
    </dependencies>

</project>

RxJava: Basic Guide: https://jreact.com/index.php/2023/11/05/rxjava-basic-guide/

RxJava

RxJava is a Java based extension of ReactiveX. It provides implementation of ReactiveX project in Java. Following are the key characteristics of RxJava.

  • Extends the observer pattern.
  • Support sequences of data/events.
  • Provides operators to compose sequences together declaratively.
  • Handles threading, synchronization, thread-safety and concurrent data structures internally.

How Observable works

Observables represents the sources of data where as Observers (Subscribers) listen to them. In nutshell, an Observable emits items and a Subscriber then consumes these items.

Observable

  • Observable provides data once subscriber starts listening.
  • Observable can emit any number of items.
  • Observable can emit only signal of completion as well with no item.
  • Observable can terminate successfully.
  • Observable may never terminate. e.g. a button can be clicked any number of times.
  • Observable may throw error at any point of time.

Observer (Subscriber)

  • Observable can have multiple observers (subscribers).
  • When an Observable emits an item, each subscriber onNext() method gets invoked.
  • When an Observable finished emitting items, each subscriber onComplete() method gets invoked.
  • If an Observable emits error, each subscriber onError() method gets invoked.

Classes to Create Observables.

  • Flowable − 0..N flows, Emits 0 or n items. Supports Reactive-Streams and back-pressure.
  • Observable − 0..N flows ,but no back-pressure.
  • Single − 1 item or error. Can be treated as a reactive version of method call.
  • Completable − No item emitted. Used as a signal for completion or error. Can be treated as a reactive version of Runnable.
  • MayBe − Either No item or 1 item emitted. Can be treated as a reactive version of Optional.

Methods to Create Observables

In Observable Class:

  • just(T item) − Returns an Observable that signals the given (constant reference) item and then completes.
  • fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
  • fromArray(T… items) − Converts an Array into an ObservableSource that emits the items in the Array.
  • fromCallable(Callable supplier) − Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
  • fromFuture(Future future) − Converts a Future into an ObservableSource.
  • interval(long initialDelay, long period, TimeUnit unit) − Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.

Creating Observables

This list shows methods that create reactive sources, such as Observables.
See: https://github.com/ReactiveX/RxJava/wiki/Creating-Observables

targetType.from{sourceType}()

from{sourceType}() wraps or converts another reactive type to the target reactive type. The following combinations are available in the various reactive types with the following signature pattern: targetType.from{sourceType}(). Not all possible conversion is implemented via the from{reactive type} method families. Check out the to{reactive type} method families for further conversion possibilities.

Example: (see also: http://jreact.com/index.php/2023/10/27/rxjava-examples/ )

        Mono<Integer> reactorMono = Mono.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1));

        Observable<Integer> observable = Observable.fromPublisher(reactorMono);

        observable.subscribe(
                item -> System.out.println(item),
                error -> error.printStackTrace(),
                () -> System.out.println("Done"));

Observable

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html

public abstract class Observable<T> extends Object implements ObservableSource<T>

The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.

Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.

The Observable‘s operators, by default, run with a buffer size of 128 elements (see Flowable.bufferSize()), that can be overridden globally via the system parameter rx3.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.

The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

Examples – see: http://jreact.com/index.php/2023/10/27/rxjava-examples/

Example#1

Observable.just

        Observable<String> observable = Observable.just("Hello, world");
        observable.subscribe(System.out::println);

Example#2

Observable.fromIterable

        List<Integer> list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));

        Observable<Integer> observable = Observable.fromIterable(list);

        observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(),
                () -> System.out.println("Done"));

Flowable

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

public abstract class Flowable<T> extends Object implements Publisher<T>

The Flowable class that implements the Reactive Streams Publisher Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.

Reactive Streams operates with Publishers which Flowable extends. Many operators therefore accept general Publishers directly and allow direct interoperation with other Reactive Streams implementations.

The Flowable hosts the default buffer size of 128 elements for operators, accessible via bufferSize(), that can be overridden globally via the system parameter rx3.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.

The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

A Flowable is similar to an Observable in that it represents a stream of data or events that can be observed and processed by Observers. However, Flowable adds the ability to handle backpressure, which is crucial when dealing with large amounts of data or slow consumers. Backpressure refers to the mechanism of controlling the flow of data so that the consumer can process it at its own pace, avoiding overwhelming or dropping data.

Flowables support backpressure and require Subscribers to signal demand via Subscription.request(long).

Examples – Creating a Flowable

Flowables can be created in a similar way to Observables. Here are a few common methods:

Example#1 – Create

Flowable.create() method allows you to manually create a Flowable. Within the create() method, you define the logic for emitting items to the Observers. For example:

Flowable<String> flowable = Flowable.create(emitter -> {
    emitter.onNext("Hello");
    emitter.onNext("World");
    emitter.onComplete();
}, BackpressureStrategy.BUFFER);

Example#2 – Just

Flowable.just()

Flowable<Integer> flowable = Flowable.just(1, 2, 3, 4, 5);

Example#3 – FromIterable

Flowable.fromIterable()

Example#4 – Subscribing to a Flowable

        Flowable<String> flowable = Flowable.fromIterable(Arrays.asList("one", null, "two")).skipLast(1);
        flowable.subscribe(new Subscriber<String>() {

            @Override
            public void onNext(String item) {
                // Handle the emitted item
                System.out.println(item);
            }

            @Override
            public void onError(Throwable error) {
                // Handle errors
                error.printStackTrace();
            }

            @Override
            public void onComplete() {
                // Handle completion
                System.out.println("Flowable completed");
            }

            @Override
            public void onSubscribe(Subscription subscription) {
                subscription.request(Long.MAX_VALUE); // Request all items
            }

        });

Maybe

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html

Maybe is a special kind of Observable which can only emit zero or one item, and report an error if the computation fails at some point. We can work with Maybe like a Flowable as long as the operation makes sense for 0 or 1 items.

        Disposable d = Maybe.just("Hello World")
                .delay(10, TimeUnit.SECONDS, Schedulers.io())
                .subscribeWith(new DisposableMaybeObserver<String>() {
                    @Override
                    public void onStart() {
                        System.out.println("Started");
                    }

                    @Override
                    public void onSuccess(String value) {
                        System.out.println("Success: " + value);
                    }

                    @Override
                    public void onError(Throwable error) {
                        error.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });

        Thread.sleep(5000);

        d.dispose();

Single

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html

Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable).

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error

Example#1

 Disposable d = Single.just("Hello World")
    .delay(10, TimeUnit.SECONDS, Schedulers.io())
    .subscribeWith(new DisposableSingleObserver<String>() {
        @Override
        public void onStart() {
            System.out.println("Started");
        }

        @Override
        public void onSuccess(String value) {
            System.out.println("Success: " + value);
        }

        @Override
        public void onError(Throwable error) {
            error.printStackTrace();
        }
    });
 
 Thread.sleep(5000);
 
 d.dispose();

Example#2

    @Test
    public void just02() {
        TestSubscriber<String> ts = new TestSubscriber<String>();
        Single.just("Welcome")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) {
                        return s + "B";
                    }
                })
                .toFlowable().subscribe(ts);
        ts.assertValueSequence(Arrays.asList("WelcomeB"));
    }

Completable

http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html

The Completable class represents a deferred computation without any value but only indication for completion or exception.

Completable behaves similarly to Observable except that it can only emit either a completion or error signal (there is no onNext or onSuccess as with the other reactive types).

Example#1

        Disposable d = Completable.complete()
                .delay(10, TimeUnit.SECONDS, Schedulers.io())
                .subscribeWith(new DisposableCompletableObserver() {
                    @Override
                    public void onStart() {
                        System.out.println("Started");
                    }

                    @Override
                    public void onError(Throwable error) {
                        error.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("Done!");
                    }
                });

        Thread.sleep(5000);
        d.dispose();   

Example#2

        Completable
                .complete()
                .subscribe(new DisposableCompletableObserver() {
                    @Override
                    public void onComplete() {
                        System.out.println("Completed!");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }
                });

More Examples: https://www.programcreek.com/java-api-examples/?api=io.reactivex.Completable


Code: http://jreact.com/index.php/2023/10/27/rxjava-examples/