About
ReactiveX is Functional reactive programming library (implemented in several languages)
It composes asynchronous and event-based programs using observable asynchronous sequences of multiple items.
It's based on reactive stream.
ReactiveX is a combination of:
- the Observer pattern,
- the Iterator pattern,
ReactiveX operates on discrete values that are emitted over time
also known as LINQ to Events
The goal of Rx is to coordinate and orchestrate :
- event-based - low latency - (event-based app)
- and asynchronous - high-latency - computations (Web sockets, Web services, ..)
Use case
- run some computation, network request on a background thread and show the results (or error) on the UI thread.
Visualisation
Marble diagrams are interactive diagrams that depict how Rx operators work with observable sequences.
- https://rxfiddle.net/ - visualize
DataFlow
The dataflows consist of:
- a source,
- zero or more intermediate steps
- followed by a data consumer or combinator step (where the step is responsible to consume the dataflow by some means)
The object traveling along the dataflow is known as emission, emits, item, event, signal, data and message represent .
Main flow
source
.operator1()
.operator2() // ↑↑↑ upstream ↑↑↑
.operator3() // ↓↓↓ downstream ↓↓↓
.subscribe(consumer);
Sub flow
source
.flatMap(value -> source
.operator1()
.operator2()
.operator3()
);
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand)
)
.subscribe(System.out::println);
Model
Observable
You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.” With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast, with an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.
Event | Iterable (Pull) | Observable (Push) |
---|---|---|
retrieve data | T next() | onNext(T) |
discover error | throws Exception | onError(Exception) |
complete | returns | onCompleted() |
note: a BlockingObservable behaves very much like a Iterable .
Example:
- Iterable
getDataFromLocalMemory()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.forEach({ println "next => " + it })
- Observable
getDataFromNetwork()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.subscribe({ println "onNext => " + it })
The contract of a Observable is that:
- zero to N data events can happen,
- terminated by:
- a complete event
- or an error
The ReactiveX Observable model:
- allows to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays - Observables Are Composable - Observables are intended for composing flows and sequences of asynchronous data.
- frees from tangled webs of callbacks
single items | multiple items | |
---|---|---|
synchronous | T getData() | Iterable<T> getData() |
asynchronous | Future<T> getData() | Observable<T> getData() |
where: Asynchronous - Future
Operator
ReactiveX allows you to chain operators together to transform and compose Observables. This style of chaining method is called a fluent API.
RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior.
Data Flow State
Assembly time
assembly time is the preparation of dataflows by applying various intermediate operators
With the below code, the data is not flowing yet
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
Subscription time
This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:
flow.subscribe(System.out::println)
This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.
Runtime
This is the state when the flows are actively emitting items, errors or completion signals
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
Practically, this is when the body of the given example above executes.
Flows
RxJava 3 features several base classes (flow) where you can discover operators.
0..N flows
- RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html is designated to support backpressure (supporting Reactive-Streams)
- RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html is dedicated to the non-backpressured operations (short sequences, GUI interactions, etc.).
0..1 flows
1 item or no item flow
- at most one item with index 0: RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html: a flow of exactly 1 item or an error,
- always one item with index 0: RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html: a flow without items but only a completion or error signal,
- always empty: RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html: a flow with no items, exactly one item or an error.
Parallel
Flows are sequential in nature split into processing stages that may run concurrently with each other.
It means running independent flows and merging their results back into a single flow.
Example:
Flowable.range(1, 10)
.flatMap(v ->Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
where the operator flatMap
- maps each number from 1 to 10 into its own individual Flowable
- runs them
- and merges the computed squares back into the main flow
Operators:
- flatMap doesn't guarantee any order and the items from the inner flows may end up interleaved.
- concatMap that maps and runs one inner flow at a time
- concatMapEager which runs all inner flows “at once” but the output flow will be in the order those inner flows were created.
Parallel flows may be initiated also with the Flowable Parallel
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Usage
To use RxJava, you
- create Observables (which emit data items), Doc - Creating-Observables
- transform those Observables with Observable operators (sort of functional programming method)
- and then observe and react to these sequences of interesting items (by:
- implementing Observers or Subscribers
- and then subscribing them to the resulting transformed Observables).
Creating Observable
It extends the observer pattern to:
- support sequences of data and/or events
- and adds operators
Observables will:
- synchronously invoke the onNext( ) method of any subscriber that subscribes to them, for each item to be emitted by the Observable,
- will then invoke the subscriber’s onCompleted( ) method.
You can implement asynchronous i/o, computational operations, or even infinite streams of data by designing your own Observable and implementing it with the create( ) method.
Transforming Observables with Operators
Error Handling
https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
- onErrorResumeNext() and onErrorReturn() allow Observables to continue with fallbacks in the event that they encounter errors.
High Latency Computation
- Typically, you can move computations or blocking IO to some other thread via subscribeOn.
- Once the data is ready, you can make sure they get processed on the foreground or GUI thread via observeOn.
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
Concept
Schedulers
A scheduler is a thread abstraction (a uniform concurrency API)
ReactiveX operators work with Schedulers (not directly with Threads or ExecutorServices)
The below standard schedulers are accessible via the Schedulers utility class.
- Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
- Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
- Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
- Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.
- Schedulers.from(Executor): wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler to have a larger but still fixed pool of threads (unlike computation() and io() respectively).
- Only on Android: AndroidSchedulers.mainThread(),
- Only on Swing: SwingScheduler.instance()
- Only on JavaFx: JavaFXSchedulers.gui().
In RxJava the default Schedulers run on daemon threads, which means:
- once the Java main thread exits,
- they all get stopped and background computations may never happen.
Sleeping for some time (Thread.sleep(2000)) lets you see the output of the flow on the console.