编程技术文章分享与教程

网站首页 > 技术文章 正文

13万字详细分析JDK中Stream的实现原理(中)

hmc789 2024-11-16 20:57:24 技术文章 4 ℃

文章篇幅过长,所以将本文分为了三篇,想要看前面内容的读者朋友们点此链接,或查看本人主页:13万字详细分析JDK中Stream的实现原理(上)

ReferencePipeline源码分析#

既然Stream具备流的特性,那么就需要一个链式数据结构,让元素能够从Source一直往下"流动"和传递到每一个链节点,实现这种场景的常用数据结构就是双向链表(考虑需要回溯,单向链表不太合适),目前比较著名的实现有AQS和Netty中的ChannelHandlerContext。例如Netty中的流水线ChannelPipeline设计如下:

对于这个双向链表的数据结构,Stream中对应的类就是AbstractPipeline,核心实现类在ReferencePipeline和ReferencePipeline的内部类。

主要接口#

先简单展示AbstractPipeline的核心父类方法定义,主要接父类是Stream、BaseStream和PipelineHelper:

  • Stream代表一个支持串行和并行聚合操作集合的元素序列,此顶层接口提供了流中间操作、终结操作和一些静态工厂方法的定义(由于方法太多,这里不全部列举),这个接口本质是一个建造器类型接口(对接中间操作来说),可以构成一个多中间操作,单终结操作的链,例如:
public interface Stream<T> extends BaseStream<T, Stream<T>> {
    
    // 忽略其他代码

    // 过滤Op
    Stream<T> filter(Predicate<? super T> predicate);

    // 映射Op
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    
    // 终结操作 - 遍历
    void forEach(Consumer<? super T> action);

    // 忽略其他代码
}

// init
Stream x = buildStream();
// chain: head -> filter(Op) -> map(Op) -> forEach(Terminal Op)
x.filter().map().forEach()
  • BaseStream:Stream的基础接口,定义流的迭代器、流的等效变体(并发处理变体、同步处理变体和不支持顺序处理元素变体)、并发和同步判断以及关闭相关方法
// T是元素类型,S是BaseStream<T, S>类型
// 流的基础接口,这里的流指定的支持同步执行和异步执行的聚合操作的元素序列
public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
    
    // 返回一个当前Stream实例中所有元素的迭代器
    // 这是一个终结操作
    Iterator<T> iterator();
    
    // 返回一个当前Stream实例中所有元素的可拆分迭代器
    Spliterator<T> spliterator();
    
    // 当前的Stream实例是否支持并发
    boolean isParallel();
    
    // 返回一个等效的同步处理的Stream实例
    S sequential();
    
    // 返回一个等效的并发处理的Stream实例
    S parallel();
    
    // 返回一个等效的不支持StreamOpFlag.ORDERED特性的Stream实例
    // 或者说支持StreamOpFlag.NOT_ORDERED的特性,也就返回的变体Stream在处理元素的时候不需要顺序处理
    S unordered();
    
    // 返回一个添加了close处理器的Stream实例,close处理器会在下面的close方法中回调
    S onClose(Runnable closeHandler);
    
    // 关闭当前Stream实例,回调关联本Stream的所有close处理器
    @Override
    void close();
}
  • PipelineHelper:
abstract class PipelineHelper<P_OUT> {

    // 获取流的流水线的数据源的"形状",其实就是数据源元素的类型
    // 主要有四种类型:REFERENCE(除了int、long和double之外的引用类型)、INT_VALUE、LONG_VALUE和DOUBLE_VALUE
    abstract StreamShape getSourceShape();

    // 获取合并流和流操作的标志,合并的标志包括流的数据源标志、中间操作标志和终结操作标志
    // 从实现上看是当前流管道节点合并前面所有节点和自身节点标志的所有标志
    abstract int getStreamAndOpFlags();

    // 如果当前的流管道节点的合并标志集合支持SIZED,则调用Spliterator.getExactSizeIfKnown()返回数据源中的准确元素数量,否则返回-1
    abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);

    // 相当于调用下面的方法组合:copyInto(wrapSink(sink), spliterator)
    abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);

    // 发送所有来自Spliterator中的元素到Sink中,如果支持SHORT_CIRCUIT标志,则会调用copyIntoWithCancel
    abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);

    // 发送所有来自Spliterator中的元素到Sink中,Sink处理完每个元素后会检查Sink#cancellationRequested()方法的状态去判断是否中断推送元素的操作
    abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);

    // 创建接收元素类型为P_IN的Sink实例,实现PipelineHelper中描述的所有中间操作,用这个Sink去包装传入的Sink实例(传入的Sink实例的元素类型为PipelineHelper的输出类型P_OUT)
    abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);

    // 包装传入的spliterator,从源码来看,在Stream链的头节点调用会直接返回传入的实例,如果在非头节点调用会委托到StreamSpliterators.WrappingSpliterator()方法进行包装
    // 这个方法在源码中没有API注释
    abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);

    // 构造一个兼容当前Stream元素"形状"的Node.Builder实例
    // 从源码来看直接委托到Nodes.builder()方法
    abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,
                                                 IntFunction<P_OUT[]> generator);

    // Stream流水线所有阶段(节点)应用于数据源Spliterator,输出的元素作为结果收集起来转化为Node实例
    // 此方法应用于toArray()方法的计算,本质上是一个终结操作
    abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,
                                        boolean flatten,
                                        IntFunction<P_OUT[]> generator);
}

注意一点(重复3次):

  • 这里把同步流称为同步处理|执行的流,"并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
  • 这里把同步流称为同步处理|执行的流,"并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
  • 这里把同步流称为同步处理|执行的流,"并行流"称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行

Sink和引用类型链#

PipelineHelper的几个方法中存在Sink这个接口,上一节没有分析,这一小节会详细展开。Stream在构建的时候虽然是一个双向链表的结构,但是在最终应用终结操作的时候,会把所有操作转化为引用类型链(ChainedReference),记得之前也提到过这种类似于多层包装器的编程模式,简化一下模型如下:

public class WrapperApp {

    interface Wrapper {

        void doAction();
    }

    public static void main(String[] args) {
        AtomicInteger counter = new AtomicInteger(0);
        Wrapper first = () -> System.out.printf("wrapper [depth => %d] invoke\n", counter.incrementAndGet());
        Wrapper second = () -> {
            first.doAction();
            System.out.printf("wrapper [depth => %d] invoke\n", counter.incrementAndGet());
        };
        second.doAction();
    }
}

// 控制台输出
wrapper [depth => 1] invoke
wrapper [depth => 2] invoke

上面的例子有点突兀,两个不同Sink的实现可以做到无感知融合,举另一个例子如下:

public interface Sink<T> extends Consumer<T> {

    default void begin(long size) {

    }

    default void end() {

    }

    abstract class ChainedReference<T, OUT> implements Sink<T> {

        protected final Sink<OUT> downstream;

        public ChainedReference(Sink<OUT> downstream) {
            this.downstream = downstream;
        }
    }
}

@SuppressWarnings({"unchecked", "rawtypes"})
public class ReferenceChain<OUT, R> {

    /**
     * sink chain
     */
    private final List<Supplier<Sink<?>>> sinkBuilders = new ArrayList<>();

    /**
     * current sink
     */
    private final AtomicReference<Sink> sinkReference = new AtomicReference<>();

    public ReferenceChain<OUT, R> filter(Predicate<OUT> predicate) {
        //filter
        sinkBuilders.add(() -> {
            Sink<OUT> prevSink = (Sink<OUT>) sinkReference.get();
            Sink.ChainedReference<OUT, OUT> currentSink = new Sink.ChainedReference<>(prevSink) {

                @Override
                public void accept(OUT out) {
                    if (predicate.test(out)) {
                        downstream.accept(out);
                    }
                }
            };
            sinkReference.set(currentSink);
            return currentSink;
        });
        return this;
    }

    public ReferenceChain<OUT, R> map(Function<OUT, R> function) {
        // map
        sinkBuilders.add(() -> {
            Sink<R> prevSink = (Sink<R>) sinkReference.get();
            Sink.ChainedReference<OUT, R> currentSink = new Sink.ChainedReference<>(prevSink) {

                @Override
                public void accept(OUT in) {
                    downstream.accept(function.apply(in));
                }
            };
            sinkReference.set(currentSink);
            return currentSink;
        });
        return this;
    }

    public void forEachPrint(Collection<OUT> collection) {
        forEachPrint(collection, false);
    }

    public void forEachPrint(Collection<OUT> collection, boolean reverse) {
        Spliterator<OUT> spliterator = collection.spliterator();
        // 这个是类似于terminal op
        Sink<OUT> sink = System.out::println;
        sinkReference.set(sink);
        Sink<OUT> stage = sink;
        // 反向包装 -> 正向遍历
        if (reverse) {
            for (int i = 0; i <= sinkBuilders.size() - 1; i++) {
                Supplier<Sink<?>> supplier = sinkBuilders.get(i);
                stage = (Sink<OUT>) supplier.get();
            }
        } else {
            // 正向包装 -> 反向遍历
            for (int i = sinkBuilders.size() - 1; i >= 0; i--) {
                Supplier<Sink<?>> supplier = sinkBuilders.get(i);
                stage = (Sink<OUT>) supplier.get();
            }
        }
        Sink<OUT> finalStage = stage;
        spliterator.forEachRemaining(finalStage);
    }

    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(12);
        ReferenceChain<Integer, Integer> chain = new ReferenceChain<>();
        // filter -> map -> for each
        chain.filter(item -> item > 10)
                .map(item -> item * 2)
                .forEachPrint(list);
    }
}

// 输出结果
24

执行的流程如下:

多层包装器的编程模式的核心要领就是:

  • 绝大部分操作可以转换为java.util.function.Consumer的实现,也就是实现accept(T t)方法完成对传入的元素进行处理
  • 先处理的Sink总是以后处理的Sink为入参,在自身处理方法中判断和回调传入的Sink的处理方法回调,也就是构建引用链的时候,需要从后往前构建,这种方式的实现逻辑可以参考AbstractPipeline#wrapSink(),例如:
// 目标顺序:filter -> map
Sink mapSink = new Sink(inputSink){

    private Function mapper;

    public void accept(E ele) {
        inputSink.accept(mapper.apply(ele))
    }

}
Sink filterSink = new Sink(mapSink){

    private Predicate predicate;

    public void accept(E ele) {
        if(predicate.test(ele)){
            mapSink.accept(ele);
        }
    }
}
  • 由上一点得知,一般来说,最后的终结操作会应用在引用链的第一个Sink上

上面的代码并非笔者虚构出来,可见java.util.stream.Sink的源码:

// 继承自Consumer,主要是继承函数式接口方法void accept(T t)
interface Sink<T> extends Consumer<T> {
    
    // 重置当前Sink的状态(为了接收一个新的数据集),传入的size是推送到downstream的准确数据量,无法评估数据量则传入-1
    default void begin(long size) {}

    // 
    default void end() {}

    // 返回true的时候表示当前的Sink不会接收数据
    default boolean cancellationRequested() {
        return false;
    }

    // 特化方法,接受一个int类型的值
    default void accept(int value) {
        throw new IllegalStateException("called wrong accept method");
    }

    // 特化方法,接受一个long类型的值
    default void accept(long value) {
        throw new IllegalStateException("called wrong accept method");
    }

    // 特化方法,接受一个double类型的值
    default void accept(double value) {
        throw new IllegalStateException("called wrong accept method");
    }
    
    // 引用类型链,准确来说是Sink链
    abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
        
        // 下一个Sink
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
    // 暂时忽略Int、Long、Double的特化类型场景
}

如果用过RxJava或者Project-Reactor,Sink更像是Subscriber,多个Subscriber组成了ChainedReference(Sink Chain,可以理解为一个复合的Subscriber),而Terminal Op则类似于Publisher,只有在Subscriber订阅Publisher的时候才会进行数据的处理,这里是应用了Reactive编程模式。

AbstractPipeline和ReferencePipeline的实现#

AbstractPipeline和ReferencePipeline都是抽象类,AbstractPipeline用于构建Pipeline的数据结构,提供一些Shape相关的抽象方法给ReferencePipeline实现,而ReferencePipeline就是Stream中Pipeline的基础类型,从源码上看,Stream链式(管道式)结构的头节点和操作节点都是ReferencePipeline的子类。先看AbstractPipeline的成员变量和构造函数:

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    
    // 流管道链式结构的头节点(只有当前的AbstractPipeline引用是头节点,此变量才会被赋值,非头节点为NULL)
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;
    
    // 流管道链式结构的upstream,也就是上一个节点,如果是头节点此引用为NULL
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;
    
    // 合并数据源的标志和操作标志的掩码
    protected final int sourceOrOpFlags;
    
    // 流管道链式结构的下一个节点,如果是头节点此引用为NULL
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;
    
    // 流的深度
    // 串行执行的流中,表示当前流管道实例中中间操作节点的个数(除去头节点和终结操作)
    // 并发执行的流中,表示当前流管道实例中中间操作节点和前一个有状态操作节点之间的节点个数
    private int depth;
     
    // 合并了所有数据源的标志、操作标志和当前的节点(AbstractPipeline)实例的标志,也就是当前的节点可以基于此属性得知所有支持的标志
    private int combinedFlags;
    
    // 数据源的Spliterator实例
    private Spliterator<?> sourceSpliterator;
    
    // 数据源的Spliterator实例封装的Supplier实例
    private Supplier<? extends Spliterator<?>> sourceSupplier;
    
    // 标记当前的流节点是否被连接或者消费掉,不能重复连接或者重复消费
    private boolean linkedOrConsumed;

    // 标记当前的流管道链式结构中是否存在有状态的操作节点,这个属性只会在头节点中有效
    private boolean sourceAnyStateful;
    
    // 数据源关闭动作,这个属性只会在头节点中有效,由sourceStage持有
    private Runnable sourceCloseAction;
    
    // 标记当前流是否并发执行
    private boolean parallel;

    // 流管道结构头节点的父构造方法,使用数据源的Spliterator实例封装的Supplier实例
    AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                     int sourceFlags, boolean parallel) {
        // 头节点的前驱节点置为NULL
        this.previousStage = null;
        this.sourceSupplier = source;
        this.sourceStage = this;
        // 合并传入的源标志和流标志的掩码
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        // 初始化合并标志集合为sourceOrOpFlags和所有流操作标志的初始化值
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        // 深度设置为0
        this.depth = 0;
        this.parallel = parallel;
    }

    // 流管道结构头节点的父构造方法,使用数据源的Spliterator实例
    AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        // 头节点的前驱节点置为NULL
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        // 合并传入的源标志和流标志的掩码
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        // 初始化合并标志集合为sourceOrOpFlags和所有流操作标志的初始化值
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }
    
    // 流管道结构中间操作节点的父构造方法
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        // 设置前驱节点的后继节点引用为当前的AbstractPipeline实例
        previousStage.nextStage = this;
        // 设置前驱节点引用为传入的前驱节点实例
        this.previousStage = previousStage;
        // 合并传入的中间操作标志和流操作标志的掩码
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        // 合并标志集合为传入的标志和前驱节点的标志集合
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        // 赋值sourceStage为前驱节点的sourceStage
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            // 标记当前的流存在有状态操作
            sourceStage.sourceAnyStateful = true;
        // 深度设置为前驱节点深度加1
        this.depth = previousStage.depth + 1;
    }

    // 省略其他方法
}

至此,可以看出流管道的数据结构:

Terminal Op不参与管道链式结构的构建。接着看AbstractPipeline中的终结求值方法(Terminal evaluation methods):

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    // 省略其他方法
    
    // 基于终结操作进行求值,这个是Stream执行的常用核心方法,常用于collect()这类终结操作
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        // 判断linkedOrConsumed,以防多次终结求值,也就是每个终结操作只能执行一次
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
        
        // 如果当前流支持并发执行,则委托到TerminalOp.evaluateParallel(),如果当前流只支持同步执行,则委托到TerminalOp.evaluateSequential()
        // 这里注意传入到TerminalOp中的方法参数分别是this(PipelineHelper类型)和数据源Spliterator
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

    // 基于当前的流实例转换为最终的Node实例,传入的IntFunction用于创建数组实例
    // 此终结方法一般用于toArray()这类终结操作
    @SuppressWarnings("unchecked")
    final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        // If the last intermediate operation is stateful then
        // evaluate directly to avoid an extra collection step
        // 当前流支持并发执行,并且最后一个中间操作是有状态,则委托到opEvaluateParallel(),否则委托到evaluate(),这两个都是AbstractPipeline中的方法
        if (isParallel() && previousStage != null && opIsStateful()) {
            // Set the depth of this, last, pipeline stage to zero to slice the
            // pipeline such that this operation will not be included in the
            // upstream slice and upstream operations will not be included
            // in this slice
            depth = 0;
            return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
        }
        else {
            return evaluate(sourceSpliterator(0), true, generator);
        }
    }

    // 这个方法比较简单,就是获取当前流的数据源所在的Spliterator,并且确保流已经消费,一般用于forEach()这类终结操作
    final Spliterator<E_OUT> sourceStageSpliterator() {
        if (this != sourceStage)
            throw new IllegalStateException();

        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        if (sourceStage.sourceSpliterator != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
            return s;
        }
        else if (sourceStage.sourceSupplier != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
            return s;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
    }
    // 省略其他方法  
}

AbstractPipeline中实现了BaseStream的方法:

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    // 省略其他方法
    
    // 设置头节点的parallel属性为false,返回自身实例,表示当前的流是同步执行的
    @Override
    @SuppressWarnings("unchecked")
    public final S sequential() {
        sourceStage.parallel = false;
        return (S) this;
    }
    
    // 设置头节点的parallel属性为true,返回自身实例,表示当前的流是并发执行的
    @Override
    @SuppressWarnings("unchecked")
    public final S parallel() {
        sourceStage.parallel = true;
        return (S) this;
    }
    
    // 流关闭操作,设置linkedOrConsumed为true,数据源的Spliterator相关引用置为NULL,置空并且回调sourceCloseAction钩子实例
    @Override
    public void close() {
        linkedOrConsumed = true;
        sourceSupplier = null;
        sourceSpliterator = null;
        if (sourceStage.sourceCloseAction != null) {
            Runnable closeAction = sourceStage.sourceCloseAction;
            sourceStage.sourceCloseAction = null;
            closeAction.run();
        }
    }
    
    // 返回一个添加了close处理器的Stream实例,close处理器会在下面的close方法中回调
    // 如果本来持有的引用sourceStage.sourceCloseAction非空,会使用传入的closeHandler与sourceStage.sourceCloseAction进行合并
    @Override
    @SuppressWarnings("unchecked")
    public S onClose(Runnable closeHandler) {
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        Objects.requireNonNull(closeHandler);
        Runnable existingHandler = sourceStage.sourceCloseAction;
        sourceStage.sourceCloseAction =
                (existingHandler == null)
                ? closeHandler
                : Streams.composeWithExceptions(existingHandler, closeHandler);
        return (S) this;
    }
 
    // Primitive specialization use co-variant overrides, hence is not final
    // 返回当前流实例中所有元素的Spliterator实例
    @Override
    @SuppressWarnings("unchecked")
    public Spliterator<E_OUT> spliterator() {
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        // 标记当前节点被链接或者消费
        linkedOrConsumed = true;
        // 如果当前节点为头节点,那么返回sourceStage.sourceSpliterator或者延时加载的sourceStage.sourceSupplier(延时加载封装由lazySpliterator实现)
        if (this == sourceStage) {
            if (sourceStage.sourceSpliterator != null) {
                @SuppressWarnings("unchecked")
                Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;
                sourceStage.sourceSpliterator = null;
                return s;
            }
            else if (sourceStage.sourceSupplier != null) {
                @SuppressWarnings("unchecked")
                Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;
                sourceStage.sourceSupplier = null;
                return lazySpliterator(s);
            }
            else {
                throw new IllegalStateException(MSG_CONSUMED);
            }
        }
        else {
            // 如果当前节点不是头节点,重新对sourceSpliterator进行包装,包装后的实例为WrappingSpliterator
            return wrap(this, () -> sourceSpliterator(0), isParallel());
        }
    }
    
    // 当前流实例是否并发执行,从头节点的parallel属性进行判断
    @Override
    public final boolean isParallel() {
        return sourceStage.parallel;
    }

    // 从当前combinedFlags中获取数据源标志和所有流中间操作标志的集合
    final int getStreamFlags() {
        return StreamOpFlag.toStreamFlags(combinedFlags);
    }

    /**
     * Get the source spliterator for this pipeline stage.  For a sequential or
     * stateless parallel pipeline, this is the source spliterator.  For a
     * stateful parallel pipeline, this is a spliterator describing the results
     * of all computations up to and including the most recent stateful
     * operation.
     */
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // 从sourceStage.sourceSpliterator或者sourceStage.sourceSupplier中获取当前流实例中的Spliterator实例,确保必定存在,否则抛出IllegalStateException
        Spliterator<?> spliterator = null;
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
        
        // 下面这段逻辑是对于并发执行并且存在有状态操作的节点,那么需要重新计算节点的深度和节点的合并标志集合
        // 这里只提一下计算过程,从头节点的后继节点开始遍历到当前节点,如果被遍历的节点时有状态的,那么对depth、combinedFlags和spliterator会进行重新计算
        // depth一旦出现有状态节点就会重置为0,然后从1重新开始增加
        // combinedFlags会重新合并sourceOrOpFlags、SHORT_CIRCUIT(如果sourceOrOpFlags支持)和Spliterator.SIZED
        // spliterator简单来看就是从并发执行的toArray()=>Array数组=>Spliterator实例
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op
            // in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;

                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        // Clear the short circuit flag for next pipeline stage
                        // This stage encapsulates short-circuiting, the next
                        // stage may not have any short-circuit operations, and
                        // if so spliterator.forEachRemaining should be used
                        // for traversal
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage
                    // based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }
        // 如果传入的terminalFlags标志不为0,则当前节点的combinedFlags会合并terminalFlags
        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

    // 省略其他方法
}

AbstractPipeline中实现了PipelineHelper的方法:

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    // 省略其他方法
    
    // 获取数据源元素的类型,这里的类型包括引用、int、double和float
    // 其实实现上就是获取depth<=0的第一个节点的输出类型
    @Override 
    final StreamShape getSourceShape() {
        @SuppressWarnings("rawtypes")
        AbstractPipeline p = AbstractPipeline.this;
        while (p.depth > 0) {
            p = p.previousStage;
        }
        return p.getOutputShape();
    }
    
    // 基于当前节点的标志集合判断和返回流中待处理的元素数量,无法获取则返回-1
    @Override
    final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
        return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
    }
    
    // 通过流管道链式结构构建元素引用链,再遍历元素引用链
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    
    // 遍历元素引用链
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
        // 当前节点不支持SHORT_CIRCUIT(短路)特性,则直接遍历元素引用链,不支持短路跳出
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            // 支持短路(中途取消)遍历元素引用链
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    
    // 支持短路(中途取消)遍历元素引用链
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        @SuppressWarnings({"rawtypes","unchecked"})
        AbstractPipeline p = AbstractPipeline.this;
        // 基于当前节点,获取流管道链式结构中第最后一个depth=0的前驱节点
        while (p.depth > 0) {
            p = p.previousStage;
        }
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 委托到forEachWithCancel()进行遍历
        boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
        wrappedSink.end();
        return cancelled;
    }
    
    // 返回当前节点的标志集合
    @Override
    final int getStreamAndOpFlags() {
        return combinedFlags;
    }
    
    // 当前节点标志集合中是否支持ORDERED
    final boolean isOrdered() {
        return StreamOpFlag.ORDERED.isKnown(combinedFlags);
    }
     
    // 构建元素引用链,生成一个多重包装的Sink(WrapSink),这里的逻辑可以看前面的分析章节
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
        // 这里遍历的时候,总是从当前节点向前驱节点遍历,也就是传入的sink实例总是包裹在最里面一层执行
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }
    
    // 包装数据源的Spliterator,如果depth=0,则直接返回sourceSpliterator,否则返回的是延迟加载的WrappingSpliterator
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {
        if (depth == 0) {
            return (Spliterator<E_OUT>) sourceSpliterator;
        }
        else {
            return wrap(this, () -> sourceSpliterator, isParallel());
        }
    }
    
    // 计算Node实例,这个方法用于toArray()方法系列,是一个终结操作,下面会另开章节详细分析
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,
                                      boolean flatten,
                                      IntFunction<E_OUT[]> generator) {
        if (isParallel()) {
            // @@@ Optimize if op of this pipeline stage is a stateful op
            return evaluateToNode(this, spliterator, flatten, generator);
        }
        else {
            Node.Builder<E_OUT> nb = makeNodeBuilder(
                    exactOutputSizeIfKnown(spliterator), generator);
            return wrapAndCopyInto(nb, spliterator).build();
        }
    }

    // 省略其他方法    
}

AbstractPipeline中剩余的待如XXYYZZPipeline等子类实现的抽象方法:

abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
        extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {

    // 省略其他方法

    // 获取当前流的输出"形状",REFERENCE、INT_VALUE、LONG_VALUE或者DOUBLE_VALUE
    abstract StreamShape getOutputShape();

    // 收集当前流的所有输出元素,转化为一个适配当前流输出"形状"的Node实例
    abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper,
                                               Spliterator<P_IN> spliterator,
                                               boolean flattenTree,
                                               IntFunction<E_OUT[]> generator);
    
    // 包装Spliterator为WrappingSpliterator实例
    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
                                            Supplier<Spliterator<P_IN>> supplier,
                                            boolean isParallel);
    
    // 包装Spliterator为DelegatingSpliterator实例
    abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,
                                            Supplier<Spliterator<P_IN>> supplier,
                                            boolean isParallel);
    // 基于Sink遍历Spliterator中的元素,支持取消操作,简单理解就是支持cancel的tryAdvance方法
    abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);

    // 返回Node的建造器实例,用于toArray方法系列
    abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown,
                                                 IntFunction<E_OUT[]> generator);
    
    // 判断当前的操作(节点)是否有状态,如果是有状态的操作,必须覆盖opEvaluateParallel方法
    abstract boolean opIsStateful();
    
    // 当前操作生成的结果会作为传入的Sink实例的入参,这是一个包装Sink的过程,通俗理解就是之前提到的元素引用链添加一个新的链节点,这个方法算是流执行的一个核心方法
    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);    
    
    // 并发执行的操作节点求值
    <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                                          Spliterator<P_IN> spliterator,
                                          IntFunction<E_OUT[]> generator) {
        throw new UnsupportedOperationException("Parallel evaluation is not supported");
    }

    // 并发执行的操作节点惰性求值
    @SuppressWarnings("unchecked")
    <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
                                                     Spliterator<P_IN> spliterator) {
        return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
    }

    // 省略其他方法
}

这里提到的抽象方法opWrapSink()其实就是元素引用链的添加链节点的方法,它的实现逻辑见子类,这里只考虑非特化子类ReferencePipeline的部分源码:

abstract class ReferencePipeline<P_IN, P_OUT>
        extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
        implements Stream<P_OUT>  {

    // 构造函数,用于头节点,传入基于Supplier封装的Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
    ReferencePipeline(Supplier<? extends Spliterator<?>> source,
                      int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

    // 构造函数,用于头节点,传入Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
    ReferencePipeline(Spliterator<?> source,
                      int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

    // 构造函数,用于中间节点,传入上一个流管道节点的实例(前驱节点)和当前操作节点支持的标志集合
    ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
        super(upstream, opFlags);
    }
    
    // 这里流的输出"形状"固定为REFERENCE
    @Override
    final StreamShape getOutputShape() {
        return StreamShape.REFERENCE;
    }
    
    // 转换当前流实例为Node实例,应用于toArray方法,后面详细分析终结操作的时候再展开
    @Override
    final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
                                        Spliterator<P_IN> spliterator,
                                        boolean flattenTree,
                                        IntFunction<P_OUT[]> generator) {
        return Nodes.collect(helper, spliterator, flattenTree, generator);
    }
    
    // 包装Spliterator=>WrappingSpliterator
    @Override
    final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
                                     Supplier<Spliterator<P_IN>> supplier,
                                     boolean isParallel) {
        return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
    }
    
    // 包装Spliterator=>DelegatingSpliterator,实现惰性加载
    @Override
    final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
        return new StreamSpliterators.DelegatingSpliterator<>(supplier);
    }
    
    // 遍历Spliterator中的元素,基于传入的Sink实例进行处理,支持Cancel操作
    @Override
    final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
        boolean cancelled;
        do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink));
        return cancelled;
    }
    
    // 构造Node建造器实例
    @Override
    final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
        return Nodes.builder(exactSizeIfKnown, generator);
    }
 
    // 基于当前流的Spliterator生成迭代器实例
    @Override
    public final Iterator<P_OUT> iterator() {
        return Spliterators.iterator(spliterator());
    }
    
    // 省略其他OP的代码
    
    // 流管道结构的头节点
    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        
        // 构造函数,用于头节点,传入基于Supplier封装的Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
        Head(Supplier<? extends Spliterator<?>> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
        
        // 构造函数,用于头节点,传入Spliterator实例作为数据源,数据源的标志集合和是否支持并发执行的判断标记
        Head(Spliterator<?> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
        
        // 不支持判断是否状态操作
        @Override
        final boolean opIsStateful() {
            throw new UnsupportedOperationException();
        }
        
        // 不支持包装Sink实例
        @Override
        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
            throw new UnsupportedOperationException();
        }

        // 区分同步异步执行forEach,同步则简单理解为调用Spliterator.forEachRemaining,异步则调用终结操作forEach
        @Override
        public void forEach(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
                super.forEach(action);
            }
        }
        
        // 区分同步异步执行forEachOrdered,同步则简单理解为调用Spliterator.forEachRemaining,异步则调用终结操作forEachOrdered
        @Override
        public void forEachOrdered(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
                super.forEachOrdered(action);
            }
        }
    }
    
    // 无状态操作节点的父类
    abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        
        // 基于上一个节点引用、输入元素"形状"和当前节点支持的标志集合创建StatelessOp实例
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }
        
        // 操作状态标记设置为无状态
        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

    // 有状态操作节点的父类
    abstract static class StatefulOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {

        // 基于上一个节点引用、输入元素"形状"和当前节点支持的标志集合创建StatefulOp实例
        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
                   StreamShape inputShape,
                   int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }
        
        // 操作状态标记设置为有状态
        @Override
        final boolean opIsStateful() {
            return true;
        }
        
        // 前面也提到,节点操作异步求值的方法在无状态节点下必须覆盖,这里重新把这个方法抽象,子类必须实现
        @Override
        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                                                       Spliterator<P_IN> spliterator,
                                                       IntFunction<E_OUT[]> generator);
    }
} 

这里重重重点分析一下ReferencePipeline中的wrapSink方法实现:

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

入参是一个Sink实例,返回值也是一个Sink实例,里面的for循环是基于当前的AbstractPipeline节点向前遍历,直到depth为0的节点跳出循环,而depth为0意味着该节点必定为头节点,也就是该循环是遍历当前节点到头节点的后继节点,Sink是"向前包装的",也就是处于链后面的节点Sink总是会作为其前驱节点的opWrapSink()方法的入参,在同步执行流求值计算的时候,前驱节点的Sink处理完元素后就会通过downstream引用(其实就是后驱节点的Sink)调用其accept()把元素或者处理完的元素结果传递进去,激活下一个Sink,以此类推。另外,ReferencePipeline的三个内部类Head、StatelessOp和StatefulOp就是流的节点类,其中只有Head是非抽象类,代表流管道结构(或者说双向链表结构)的头节点,StatelessOp(无状态操作)和StatefulOp(有状态操作)的子类构成了流管道结构的操作节点或者是终结操作。在忽略是否有状态操作的前提下看ReferencePipeline,它只是流数据结构的承载体,表面上看到的双向链表结构在流的求值计算过程中并不会进行直接遍历每个节点进行求值,而是先转化成一个多层包装的Sink,也就是前文笔者提到的元素引用链后者前一句分析的Sink元素处理以及传递,正确来说应该是一个Sink栈或者Sink包装器,它的实现可以类比为现实生活中的洋葱,或者编程模式中的AOP编程模式。形象一点的描述如下:

Head(Spliterator) -> Op(filter) -> Op(map) -> Op(sorted) -> Terminal Op(forEach)

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
forEach ele in Spliterator: 
    Sink[filter](ele){
        if filter process == true: 
            Sink[map](ele){
                ele = mapper(ele)
                Sink[sorted](ele){

                    var array 

                    begin: 
          
                    accept(ele):
                      add ele to array

                    end:
                      sort ele in array                      
                }
            }
    }

终结操作forEach是目前分析源码中最简单的实现,下面会详细分析每种终结操作的实现细节。

下一篇文章为:【13万字详细分析JDK中Stream的实现原理】最后一篇,读完本文有所收获的朋友们可以给小编点个关注,持续分享干货内容。

标签列表
最新留言