ReactiveX

About

ReactiveX is Functional reactive programming library (implemented in several languages)

It composes asynchronous and event-based programs using observable asynchronous sequences of multiple items.

It's based on reactive stream.

ReactiveX is a combination of:

ReactiveX operates on discrete values that are emitted over time

also known as LINQ to Events

The goal of Rx is to coordinate and orchestrate :

  • event-based - low latency - (event-based app)
  • and asynchronous - high-latency - computations (Web sockets, Web services, ..)

Use case

  • run some computation, network request on a background thread and show the results (or error) on the UI thread.

Visualisation

Marble diagrams are interactive diagrams that depict how Rx operators work with observable sequences.

DataFlow

The dataflows consist of:

  • a source,
  • zero or more intermediate steps
  • followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means)

The object traveling along the dataflow is known as emission, emits, item, event, signal, data and message represent .

Main flow

source
    .operator1()
    .operator2()    // ↑↑↑ upstream   ↑↑↑
    .operator3()    // ↓↓↓ downstream ↓↓↓
    .subscribe(consumer);

Sub flow

source
  .flatMap(value -> source
      .operator1()
      .operator2()
      .operator3()
  );

Example:

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
    .flatMap(inventoryItem -> 
            erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand)
            )
    .subscribe(System.out::println);

Model

Observable

You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.” With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast, with an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.

Event Iterable (Pull) Observable (Push)
retrieve data T next() onNext(T)
discover error throws Exception onError(Exception)
complete returns onCompleted()

note: a BlockingObservable behaves very much like a Iterable .

Example:

  • Iterable
getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })
  • Observable
getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })

The contract of a Observable is that:

  • zero to N data events can happen,
  • terminated by:
    • a complete event
    • or an error

The ReactiveX Observable model:

single items multiple items
synchronous T getData() Iterable<T> getData()
asynchronous Future<T> getData() Observable<T> getData()

where: Asynchronous - Future

Operator

ReactiveX allows you to chain operators together to transform and compose Observables. This style of chaining methods is called a fluent API.

RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior.

Data Flow State

Assembly time

assembly time is the preparation of dataflows by applying various intermediate operators

With the below code, the data is not flowing yet

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;

Subscription time

This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:

flow.subscribe(System.out::println)

This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.

Runtime

This is the state when the flows are actively emitting items, errors or completion signals

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

Practically, this is when the body of the given example above executes.

Flows

RxJava 3 features several base classes (flow) where you can discover operators.

0..N flows

0..1 flows

1 item or no item flow

Parallel

Flows are sequential in nature split into processing stages that may run concurrently with each other.

It means running independent flows and merging their results back into a single flow.

Example:

Flowable.range(1, 10)
  .flatMap(v ->Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

where the operator flatMap

  • maps each number from 1 to 10 into its own individual Flowable
  • runs them
  • and merges the computed squares back into the main flow

Operators:

  • flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved.
  • concatMap that maps and runs one inner flow at a time
  • concatMapEager which runs all inner flows “at once” but the output flow will be in the order those inner flows were created.

Parallel flows may be initiated also with the Flowable Parallel

Flowable.range(1, 10)
                .parallel()
                .runOn(Schedulers.computation())
                .map(v -> v * v)
                .sequential()
                .blockingSubscribe(System.out::println);

Usage

To use RxJava, you

  • create Observables (which emit data items), Doc - Creating-Observables
  • transform those Observables with Observable operators (sort of functional programming method)
  • and then observe and react to these sequences of interesting items (by:
    • implementing Observers or Subscribers
    • and then subscribing them to the resulting transformed Observables).

Creating Observable

It extends the observer pattern to:

  • support sequences of data and/or events
  • and adds operators

Observables will:

  • synchronously invoke the onNext( ) method of any subscriber that subscribes to them, for each item to be emitted by the Observable,
  • will then invoke the subscriber’s onCompleted( ) method.

You can implement asynchronous i/o, computational operations, or even infinite streams of data by designing your own Observable and implementing it with the create( ) method.

Transforming Observables with Operators

Error Handling

https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

High Latency Computation

  • Typically, you can move computations or blocking IO to some other thread via subscribeOn.
  • Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.
Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

Concept

Schedulers

A scheduler is a thread abstraction (a uniform concurrency API)

ReactiveX operators work with Schedulers (not directly with Threads or ExecutorServices)

The below standard schedulers are accessible via the Schedulers utility class.

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.
  • Schedulers.from(Executor): wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler to have a larger but still fixed pool of threads (unlike computation() and io() respectively).
  • Only on Android: AndroidSchedulers.mainThread(),
  • Only on Swing: SwingScheduler.instance()
  • Only on JavaFx: JavaFXSchedulers.gui().

In RxJava the default Schedulers run on daemon threads, which means:

  • once the Java main thread exits,
  • they all get stopped and background computations may never happen.

Sleeping for some time (Thread.sleep(2000)) lets you see the output of the flow on the console.

Documentation / Reference


Powered by ComboStrap