About
Reactive Streams 1) is the standard/specification for reactive data processing (ie observer, asynchronous processing)
The characteristics are:
- non-blocking backpressure
- a potential infinite streams
It's also known as Observable's reactive dataflow because it is all about observing a stream (flow-controlled components) in which:
- Publishers produce items consumed by one or more
- Subscribers, each managed by a subscription.
This is a push based systems (observer)
Definition
A reactive stream process is
- a potentially unbounded number of elements (data whose volume is not predetermined )
- in sequence,
- asynchronously passing elements in a producer consumer model between:
- the source of the data (producer, up)
- to the destination for the data (consumer, down)
- through one or more processing stages
- with mandatory non-blocking backpressure (backpressure is an integral part of the model)
The fact that the downstream can be slower than the upstream is taken in the domain model. You can be disconnected if consuming too slow. Examples Twitter streaming API
It's also called reactive functional programming.
Model
Interface
The model has 4 interfaces.
Publisher
A Publisher is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s).
A Publisher can serve multiple Subscriber's subscribed dynamically at various points in time.
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber
A Subscriber - A receiver of messages - arranges that items be requested and processed.
It
- connects to a Publisher,
- receives a confirmation via onSubscribe,
- then receive data via the onNext callbacks and additional signals via onError and onComplete,
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription
A Subscription represents a link between a Publisher and a Subscriber. It is used to:
- signal desire for data
- cancel demand (and allow resource cleanup).
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor
A Processor combines the capabilities of a Publisher and a Subscriber in a single interface.
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Lifeycle on the Subscriber methods
In response to a call to Publisher.subscribe(Subscriber) the possible invocation sequences for methods on the Subscriber are given by the following protocol:
onSubscribe onNext* (onError | onComplete)?
ie:
- onSubscribe is always executed (signalled)
- followed by a possibly unbounded number of onNext signals (as requested by Subscriber)
- followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available
- all as long as the Subscription is not cancelled.
Asynchronous
This section will show different asynchronous scenarios that are all valid asynchronous streams. They all have their place and each has different tradeoffs including performance and implementation complexity.
Example is made with the following flow
nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)
where:
- origin is an async selector event loops
- destination is an async selector event loops
On this flow, the Subscription.request(n) create a unit of process that must be chained from the destination to the origin. Each implementation can choose to implement it with different scenario
The following scenarios uses:
- the pipe | character to signal async boundaries (queue and schedule)
- and R# to represent resources (possibly threads).
Scenario 1
nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ---- | - R2 - | -- R3 --- | ---------- R4 ----------------
In this example each of the 3 consumers, map, filter and consumeTo asynchronously schedule the work. It could be on the same event loop (trampoline), separate threads, whatever.
Scenario 2
nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------
- Here it is only the final step that asynchronously schedules, by adding work to the NioSelectorOutput event loop.
- The map and filter steps are synchronously performed on the origin thread.
Scenario 3
Or another implementation could fuse the operations to the final consumer:
nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------
Language
Java
Reactive Streams Specification for the JVM define the basic semantics:
- how the transmission of stream elements is regulated through back-pressure.
- How elements are transmitted, their representation during transfer,
- How back-pressure is signaled is not part of this specification.
Reactive Streams includes:
- a Test Kit: TCK Site Reactive Streams TCK. The TCK (or Technology Compatibility Kit) is nothing more than a test framework that verifies if an implementation of reactive components is correct, in terms of the components interacting correctly with each other.
Starting from Java 9, they have become a part of the JDK in the form of the java.util.concurrent.Flow interfaces or you can get them as java dependencies with the following maven coordinates
- org.reactivestreams:reactive-streams:1.0.3
- org.reactivestreams:reactive-streams-tck:1.0.3
- org.reactivestreams:reactive-streams-tck-flow:1.0.3
- org.reactivestreams:reactive-streams-examples:1.0.3