Java Lambda Stream原理

举例

1
2
3
4
5
6
7
List list = new ArrayList(1);
list.add(1);
list.add(2);

list.stream().forEach(
System.out::println
);

List.stream方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}

@Override
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}

public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
int characteristics) {
return new IteratorSpliterator<>(Objects.requireNonNull(c),
characteristics);
}

List的stream()接口位于java.util.Collection类中;默认实现输入的参数1是拆分方法spliterator,2是并行默认false。
spliterator()接口也在该类中,默认实现调用final Spliterators类的spliterator方法,返回IteratorSpliterator。
而static IteratorSpliterator类实现了Spliterator。
Spliterator提供了tryAdvance处理每个元素、forEachRemaining、trySplit分割拆分等方法。

也就是说,stream()实际是采用Spliterator对于元素进行遍历、拆分处理。

StreamSupport.stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}

ReferencePipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}

list.stream()方法最终会实例化ReferencePipeline.Head<>对象,Head为pipeline流的头结。
Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>,E_IN为上游输入类型,E_OUT为输出类型点。
StreamOpFlag.fromCharacteristics(spliterator),将spliterator字符集转换为带有排序的流标记。
ReferencePipeline为继承了AbstractPipeline得抽象类,提供pipeline处理类型的各阶段基类。

AbstractPipeline

1
2
3
4
5
6
7
8
9
10
11
12
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}

AbstractPipeline为pipeline抽象基类,定义包括前一个、当前、下一个等AbstractPipeline处理流程等。
AbstractPipeline继承abstract class PipelineHelper<P_OUT>,PipelineHelper定义了流的操作、输出、标记和并行等参数。

forEach

1
void forEach(Consumer<? super T> action);

对流中的每个元素执行操作。

1
2
3
4
5
6
7
8
9
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}

forEach具体实现位于ReferencePipeline中,执行串行遍历或并行分割处理。

forEach并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;

return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}

@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}

forEach并行时,主要采用evaluate方法,在pipeline中采用终止操作处理结果。

------ 本文结束------

本文标题:Java Lambda Stream原理

文章作者:Perkins

发布时间:2019年08月07日

原始链接:https://perkins4j2.github.io/posts/42366/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。