Java - Stream Processing

About

Streams may be by backed:

  • finite:
    • collections,
    • arrays,
    • IO channel (such as those returned by Files.lines(Path, Charset)) - require closing.
  • infinite (Generating function)

How to

Return the first element

Integer i = IntStream.range(2,11).findFirst().orElse(null);

As findFirst is an optional, you use orElse instead of get because get will throw an exception if the value is null

Throw an exception if not found

Integer max = Arrays.stream(this.series)
    .max()
    .orElseThrow(
        ()->new RuntimeException("No max found")
    );

Allow null

Optional does not allow null, you can filter or map the value to a nullable optional

.map(Optional::ofNullable)

Operation

aggregate operations

processElements Action Aggregate Operation
Obtain a source of objects Stream<E> stream()
Filter objects that match a Predicate object Stream<T> filter(Predicate<? super T> predicate)
Map objects to another value as specified by a Function object <R> Stream<R> map(Function<? super T,? extends R> mapper)
Perform an action as specified by a Consumer object void forEach(Consumer<? super T> action)
members
    .stream()
    .filter(
        p -> p.getGender() == Person.Sex.MALE
            && p.getAge() >= 18
            && p.getAge() <= 25)
    .map(p -> p.getEmailAddress())
    .forEach(email -> System.out.println(email));

Aggregate operations process elements from a stream, not directly from a collection but from a stream (pipeline)

  • filter,
  • map,
  • forEach

Aggregate operations accept lambda expressions as parameters.

Aggregate vs Iterator

Fundamental differences between Aggregate operations (like forEach) and iterators.

Parallelism

Aggregate operations:

  • determines what collection it iterates
  • and delegates how to iterate the collection to the JDK (internal iteration)

That's why aggregate operations do not contain a method like next to instruct them to process the next element of the collection (called an external iteration, when the application determines both what collection it iterates and how it iterates it). External iteration can only iterate over the elements of a collection sequentially.

Internal iteration can iterate in parallel and not sequentially by implementing a map-reduce paradigm

See Parallelism

Stream

They process elements from a stream: Aggregate operations process elements from a stream, not directly from a collection. Consequently, they are also called stream operations.

Behavior

They support behavior as parameters: You can specify lambda expressions as parameters for most aggregate operations. This enables you to customize the behavior of a particular aggregate operation.

Pipeline

Data Processing - Data Flow (ETL | Workflow | Pipeline)

A pipeline is a sequence of stream operations (ie aggregate operations)

A stream is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through a pipeline.

A stream is created by invoking the method stream.

In a pipeline, an intermediate operation, such as filter, produces a new stream.

A pipeline contains the following components:

  • A source: This could be :
    • a collection,
    • an array,
    • a generator function,
    • or an I/O channel.
  • Zero or more intermediate operations. An intermediate operation, such as filter, produces a new stream.
  • A terminal operation. A terminal operation, such as forEach, produces a non-stream result, such as:
    • a primitive value (like a double value), (such as average, sum, min, max, and count) - return one value by combining the contents of a stream - These operations are called reduction operations (Reduction operation doc
    • a collection,
    • or in the case of forEach, no value at all.

The Java runtime and compiler infer that the type of the lambda arguments.

Example

Average

average age of all male members

double average = members
    .stream() // create a stream of members
    .filter(p -> p.getGender() == Person.Sex.MALE) // return a stream of male person
    .mapToInt(Person::getAge) // return the age of each mal members in a new stream
    .average() // average them
    .getAsDouble(); // return it as double

Operation

Peek

Peek For debugging or logging purpose (stateful therefore, ie the log pointer should be known)

method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline

 Stream.of(1, 2, 3, 4)
      .filter(e -> e > 2)
      .peek(e -> System.out.println("Peek: Value after the Filter stage: " + e))
      .map(e -> e + 1)
      .peek(e -> System.out.println("Peek: Value after the Mapped value: " + e))
      .collect(Collectors.toList());
Peek: Value after the Filter stage: 3
Peek: Value after the Mapped value: 4
Peek: Value after the Filter stage: 4
Peek: Value after the Mapped value: 5

Reduce

General-purpose reduction operations:

Example

Example: sum implementations.

Integer totalAgeReduce = roster
   .stream()
   .map(Person::getAge)
   .reduce(
       0,
       (a, b) -> a + b
   );

where:

  • 0 is the value of the identity argument. It is both:
    • the initial value of the reduction
    • and the default result if there are no elements in the stream.
  • (a, b) → a + b is a lambda expression that defines the accumulator function where
    • a is the partial value of all processed elements so far (In this case, a sum)
    • b is the next element of the stream
    • and a+b is the operation that returns a partial result

The reduce method creates always a new value when it processes an element.

Which means that when the reduce operation is just to add elements to a collection, the accumulator function will:

  • process an element,
  • creates a new collection
  • and add the elements to the collection,

every time which is particularly inefficient. The Stream.collect method solves this problem because it updates an existing collection instead.

Collect

Like the reduce method and all other reduction methods, the collect method returns only one value.

https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html#collect

Example:

  • Return a list
stream
.map(Obj:FunctionThatReturnAList)
.collect(ArrayList::new, List::addAll, List::addAll);


List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,ArrayList::addAll);

Interface

Function<Input,Output>

Compose

Function<String, String> helloOp = (s) -> "Hello " + s;
    Function<String, String> shoutedOp = (s) ->  s + " !!!";
    Function<String, String> helloShoutedOp = shoutedOp.compose(helloOp);
    String helloWorldShouted = helloShoutedOp.apply("World");
    System.out.println(helloWorldShouted);
Hello World !!!

Type

Infinite

generating functions, The only way to make such a stream finite, is to chain a limit operation to the stream.

Finite

If you want to create a non-empty finite stream that is not backed by an array or collection and none of the existing stream sources fits, you have to implement your own Spliterator and create a stream out of it

Stateful vs stateless

Stateful means that the function that implements the interface have local variables that records the stream that passed by.

Collector and Consumer are two lambda interfaces that by definition are stateful.

Operation Type Why
collect(Collector) Stateful collect all the elements in a collection (state)
forEach(Consumer) Stateful the Consumer is by definition stateful, well except if it's a black hole (no-op)
peek(Consumer) Stateful The Consumer is by definition stateful, because why peek if not to store it somewhere (e.g. log)
distinct Stateful
Predicate, Function, UnaryOperator, BinaryOperator, and Comparator Stateless

Implementation

The standard way of implementing a stream, is to implement:

In either case, the implementation has a way to report an end when:

  • Spliterator.tryAdvance returns false or its forEachRemaining method just returns,
  • or in case of an Iterator source, when hasNext() returns false.

You can use an existing method to create a Spliterator out of an Iterator, but you should resists the temptation to use an Iterator just because it’s familiar. A Spliterator is not hard to implement.

Ref

Execution

The “laziness” of Streams means that they start executing only when they encounter the terminal (final) operation.

Close

try (IntStream stream = IntStream.range(1,10)) {
      stream.onClose(() -> System.out.println("--")).forEach(x -> System.out.println(x));
}

Powered by ComboStrap