Table of Contents

ReactiveX

About

ReactiveX is Functional reactive programming library (implemented in several languages)

It composes asynchronous and event-based programs using observable asynchronous sequences of multiple items.

It's based on reactive stream.

ReactiveX is a combination of:

ReactiveX operates on discrete values that are emitted over time

also known as LINQ to Events

The goal of Rx is to coordinate and orchestrate :

Use case

Visualisation

Marble diagrams are interactive diagrams that depict how Rx operators work with observable sequences.

DataFlow

The dataflows consist of:

The object traveling along the dataflow is known as emission, emits, item, event, signal, data and message represent .

Main flow

source
    .operator1()
    .operator2()    // ↑↑↑ upstream   ↑↑↑
    .operator3()    // ↓↓↓ downstream ↓↓↓
    .subscribe(consumer);

Sub flow

source
  .flatMap(value -> source
      .operator1()
      .operator2()
      .operator3()
  );

Example:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
    .flatMap(inventoryItem -> 
            erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand)
            )
    .subscribe(System.out::println);

Model

Observable

You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.” With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast, with an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

Event Iterable (Pull) Observable (Push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete returns onCompleted()

note: a BlockingObservable behaves very much like a Iterable .

Example:

getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })
getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })

The contract of a Observable is that:

The ReactiveX Observable model:

single items multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

where: Asynchronous - Future

Operator

ReactiveX allows you to chain operators together to transform and compose Observables. This style of chaining method is called a fluent API.

RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior.

Data Flow State

Assembly time

assembly time is the preparation of dataflows by applying various intermediate operators

With the below code, the data is not flowing yet

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Flows

RxJava 3 features several base classes (flow) where you can discover operators.

0..N flows

0..1 flows

1 item or no item flow

Parallel

Flows are sequential in nature split into processing stages that may run concurrently with each other.

It means running independent flows and merging their results back into a single flow.

Example:

Flowable.range(1, 10)
  .flatMap(v ->Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

where the operator flatMap

Operators:

Parallel flows may be initiated also with the Flowable Parallel

Flowable.range(1, 10)
                .parallel()
                .runOn(Schedulers.computation())
                .map(v -> v * v)
                .sequential()
                .blockingSubscribe(System.out::println);

Usage

To use RxJava, you

Creating Observable

It extends the observer pattern to:

Observables will:

You can implement asynchronous i/o, computational operations, or even infinite streams of data by designing your own Observable and implementing it with the create( ) method.

Transforming Observables with Operators

Error Handling

https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

High Latency Computation

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

Concept

Schedulers

A scheduler is a thread abstraction (a uniform concurrency API)

ReactiveX operators work with Schedulers (not directly with Threads or ExecutorServices)

The below standard schedulers are accessible via the Schedulers utility class.

In RxJava the default Schedulers run on daemon threads, which means:

Sleeping for some time (Thread.sleep(2000)) lets you see the output of the flow on the console.

Documentation / Reference