Perform an action as specified by a Consumer object | void forEach(Consumer<? super T> action) |
members
.stream()
.filter(
p -> p.getGender() == Person.Sex.MALE
&& p.getAge() >= 18
&& p.getAge() <= 25)
.map(p -> p.getEmailAddress())
.forEach(email -> System.out.println(email));
Aggregate operations process elements from a stream, not directly from a collection but from a stream (pipeline)- filter,
- map,
- forEach
Aggregate vs Iterator
Fundamental differences between Aggregate operations (like forEach) and iterators. Parallelism
Aggregate operations:- determines what collection it iterates
- and delegates how to iterate the collection to the JDK (internal iteration)
Stream
They process elements from a stream: Aggregate operations process elements from a stream, not directly from a collection. Consequently, they are also called stream operations.Behavior
They support behavior as parameters: You can specify lambda expressions as parameters for most aggregate operations. This enables you to customize the behavior of a particular aggregate operation.Pipeline
Data Processing - Data Flow (ETL | Workflow | Pipeline)A pipeline is a sequence of stream operations (ie aggregate operations) A stream is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through a pipeline.A stream is created by invoking the method stream.In a pipeline, an intermediate operation, such as filter, produces a new stream.A pipeline contains the following components:- A source: This could be :
- a collection,
- an array,
- a generator function,
- or an I/O channel.
- Zero or more intermediate operations. An intermediate operation, such as filter, produces a new stream.
- A terminal operation. A terminal operation, such as forEach, produces a non-stream result, such as:
- a primitive value (like a double value), (such as average, sum, min, max, and count) - return one value by combining the contents of a stream - These operations are called reduction operations (Reduction operation doc
- a collection,
- or in the case of forEach, no value at all.
Example
https://docs.oracle.com/javase/tutorial/collections/streams/examples/BulkDataOperationsExamples.javaAverage
average age of all male members
double average = members
.stream() // create a stream of members
.filter(p -> p.getGender() == Person.Sex.MALE) // return a stream of male person
.mapToInt(Person::getAge) // return the age of each mal members in a new stream
.average() // average them
.getAsDouble(); // return it as double
Operation
Peek
PeekFor debugging or logging purpose (stateful therefore, ie the log pointer should be known)method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline
Stream.of(1, 2, 3, 4)
.filter(e -> e > 2)
.peek(e -> System.out.println("Peek: Value after the Filter stage: " + e))
.map(e -> e + 1)
.peek(e -> System.out.println("Peek: Value after the Mapped value: " + e))
.collect(Collectors.toList());
Peek: Value after the Filter stage: 3
Peek: Value after the Mapped value: 4
Peek: Value after the Filter stage: 4
Peek: Value after the Mapped value: 5
Reduce
General-purpose reduction operations:
Integer totalAgeReduce = roster
.stream()
.map(Person::getAge)
.reduce(
0,
(a, b) -> a + b
);
where:- 0 is the value of the identity argument. It is both:
- the initial value of the reduction
- and the default result if there are no elements in the stream.
- (a, b) → a + b is a lambda expression that defines the accumulator function where
- a is the partial value of all processed elements so far (In this case, a sum)
- b is the next element of the stream
- and a+b is the operation that returns a partial result
The reduce method creates always a new value when it processes an element. Which means that when the reduce operation is just to add elements to a collection, the accumulator function will:
- process an element,
- creates a new collection
- and add the elements to the collection,
Collect
Like the reduce method and all other reduction methods, the collect method returns only one value. https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html#collectExample:- Return a list
stream
.map(Obj:FunctionThatReturnAList)
.collect(ArrayList::new, List::addAll, List::addAll);
List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,ArrayList::addAll);
Interface
Function
Compose
Function<String, String> helloOp = (s) -> "Hello " + s;
Function<String, String> shoutedOp = (s) -> s + " !!!";
Function<String, String> helloShoutedOp = shoutedOp.compose(helloOp);
String helloWorldShouted = helloShoutedOp.apply("World");
System.out.println(helloWorldShouted);
Hello World !!!
Type
Infinite
generating functions, The only way to make such a stream finite, is to chain a limit operation to the stream.Finite
If you want to create a non-empty finite stream that is not backed by an array or collection and none of the existing stream sources fits, you have to implement your own Spliterator and create a stream out of itStateful vs stateless
Stateful means that the function that implements the interface have local variables that records the stream that passed by.Collector and Consumer are two lambda interfaces that by definition are stateful.Operation | Type | Why |
---|---|---|
collect(Collector) | Stateful | collect all the elements in a collection (state) |
forEach(Consumer) | Stateful | the Consumer is by definition stateful, well except if it's a black hole (no-op) |
peek(Consumer) | Stateful | The Consumer is by definition stateful, because why peek if not to store it somewhere (e.g. log) |
distinct | Stateful | |
Predicate, Function, UnaryOperator, BinaryOperator, and Comparator | Stateless |
Implementation
The standard way of implementing a stream, is to implement:- a Spliterator directly
- or via its Iterator functionality of a spliterator.
- Spliterator.tryAdvance returns false or its forEachRemaining method just returns,
- or in case of an Iterator source, when hasNext() returns false.
Execution
The “laziness” of Streams means that they start executing only when they encounter the terminal (final) operation.Close
try (IntStream stream = IntStream.range(1,10)) {
stream.onClose(() -> System.out.println("--")).forEach(x -> System.out.println(x));
}