ReactiveX is Functional reactive programming library (implemented in several languages)
It composes asynchronous and event-based programs using observable asynchronous sequences of multiple items.
It's based on reactive stream.
ReactiveX is a combination of:
ReactiveX operates on discrete values that are emitted over time
also known as LINQ to Events
The goal of Rx is to coordinate and orchestrate :
Marble diagrams are interactive diagrams that depict how Rx operators work with observable sequences.
The dataflows consist of:
The object traveling along the dataflow is known as emission, emits, item, event, signal, data and message represent .
source
.operator1()
.operator2() // ↑↑↑ upstream ↑↑↑
.operator3() // ↓↓↓ downstream ↓↓↓
.subscribe(consumer);
source
.flatMap(value -> source
.operator1()
.operator2()
.operator3()
);
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand)
)
.subscribe(System.out::println);
You can think of the Observable class as a “push” equivalent to Iterable, which is a “pull.” With an Iterable, the consumer pulls values from the producer and the thread blocks until those values arrive. By contrast, with an Observable the producer pushes values to the consumer whenever values are available. This approach is more flexible, because values can arrive synchronously or asynchronously.
Event | Iterable (Pull) | Observable (Push) |
---|---|---|
retrieve data | T next() | onNext(T) |
discover error | throws Exception | onError(Exception) |
complete | returns | onCompleted() |
note: a BlockingObservable behaves very much like a Iterable .
Example:
getDataFromLocalMemory()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.forEach({ println "next => " + it })
getDataFromNetwork()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.subscribe({ println "onNext => " + it })
The contract of a Observable is that:
The ReactiveX Observable model:
single items | multiple items | |
---|---|---|
synchronous | T getData() | Iterable<T> getData() |
asynchronous | Future<T> getData() | Observable<T> getData() |
where: Asynchronous - Future
ReactiveX allows you to chain operators together to transform and compose Observables. This style of chaining method is called a fluent API.
RxJava's reactive types are immutable; each of the method calls returns a new Flowable with added behavior.
assembly time is the preparation of dataflows by applying various intermediate operators
With the below code, the data is not flowing yet
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
This is a temporary state when subscribe() is called on a flow that establishes the chain of processing steps internally:
flow.subscribe(System.out::println)
This is when the subscription side-effects are triggered (see doOnSubscribe). Some sources block or start emitting items right away in this state.
This is the state when the flows are actively emitting items, errors or completion signals
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
Practically, this is when the body of the given example above executes.
RxJava 3 features several base classes (flow) where you can discover operators.
1 item or no item flow
Flows are sequential in nature split into processing stages that may run concurrently with each other.
It means running independent flows and merging their results back into a single flow.
Example:
Flowable.range(1, 10)
.flatMap(v ->Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
where the operator flatMap
Operators:
Parallel flows may be initiated also with the Flowable Parallel
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
To use RxJava, you
It extends the observer pattern to:
Observables will:
You can implement asynchronous i/o, computational operations, or even infinite streams of data by designing your own Observable and implementing it with the create( ) method.
https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
A scheduler is a thread abstraction (a uniform concurrency API)
ReactiveX operators work with Schedulers (not directly with Threads or ExecutorServices)
The below standard schedulers are accessible via the Schedulers utility class.
In RxJava the default Schedulers run on daemon threads, which means:
Sleeping for some time (Thread.sleep(2000)) lets you see the output of the flow on the console.