JAVA语言8的Stream原理详解
小标 2018-09-11 来源 : 阅读 972 评论 0

摘要:本文主要向大家介绍了JAVA语言8的Stream原理详解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了JAVA语言8的Stream原理详解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

常用的流操作
在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1所示。
表1-1 Stream的常用操作分类(表格引自这里)

如表1-1中所示,Stream中的操作可以分为两大类:中间操作与结束操作,中间操作只是对操作进行了记录,只有结束操作才会触发实际的计算(即惰性求值),这也是Stream在迭代大集合时高效的原因之一。中间操作又可以分为无状态(Stateless)操作与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响;后者是指该操作只有拿到所有元素之后才能继续下去。结束操作又可以分为短路与非短路操作,这个应该很好理解,前者是指遇到某些符合条件的元素就可以得到最终结果;而后者是指必须处理所有元素才能得到最终结果。
原理探秘
在探究Stream的执行原理之前,我们先看如下两段代码(本文将以code_1为例进行说明):
code_1

public static void main(String[] args) {    List list = Lists.newArrayList(            
"bcd", "cde", "def", "abc");    List result = list.stream()            
//.parallel()           
 .filter(e -> e.length() >= 3)           
 .map(e -> e.charAt(0))            
//.peek(System.out :: println)            
//.sorted()           
 //.peek(e -> System.out.println("++++" + e))            
.map(e -> String.valueOf(e))           
 .collect(Collectors.toList());    System.out.println("----------------------------");    System.out.println(result);}code_2public void targetMethod() {    List list = Lists.newArrayList(            
"bcd", "cde", "def", "abc");    List result = Lists.newArrayListWithCapacity(list.size());    for (String str : list) {       
 if (str.length() >= 3) {            
char e = str.charAt(0);            
String tempStr = String.valueOf(e);            
result.add(tempStr);        
}    }    System.out.println("----------------------------");    System.out.println(result);}


很明显,在最终结果上而言,code_1与code_2是等价的。那么,Stream是怎么做的呢?显然不是每次操作都进行迭代,因为这对于执行时间与存储中间变量来说都将是噩梦。
要解决的问题
显然,如果code_2只对集合迭代了一次,也就是说相当高效。那么这么做有没有弊端?有!模板代码、中间变量、不利于并行都是其存在的问题。但是按着code_2的思路可以知道有以下几个问题需要解决:
如何记录每次操作?操作如何叠加?叠加后的操作如何执行?最后的结果如何存储?
包结构分析
那么Stream是如何解决的呢?所谓源码之下,无所遁形。那么,首先来看一下Stream包的结构(如图1-1所示)。

图1-1 Stream包的结构示意图
其中各个部分的主要功能为:
主要是各种操作的工厂类、数据的存储结构以及收集器的工厂类等;主要用于Stream的惰性求值实现;Stream的并行计算框架;存储并行流的中间结果;终结操作的定义;
我们单独把第二部分拎出来用于说明Stream的惰性求值实现,如图1-2所示,Java8针对Int、long、double进行了优化,主要用于频繁的拆装箱。我们以引用类型进行介绍,在图中已经标为绿色。
BaseStream规定了流的基本接口,比如iterator、spliterator、isParallel等;Stream中定义了map、filter、flatmap等用户关注的常用操作;PipelineHelper主要用于Stream执行过程中相关结构的构建;Head、StatelessOp、StatefulOp为ReferencePipeline中的内部类。

图1-2
操作如何记录
关于操作如何记录,在JDK源码注释中多次用(操作)stage来标识用户的每一次操作,而通常情况下Stream的操作又需要一个回调函数,所以一个完整的操作是由数据来源、操作、回调函数组成的三元组来表示。而在具体实现中,使用实例化的ReferencePipeline来表示,即图1-2中的Head、StatelessOp、StatefulOp的实例。
如code_3、code_4所示为调用stream.map()的关键的两个方法,在用户
调用一系列操作后会形成如图1-3所示的双链表结构。

图1-3

code_3(ReferencePipeline.map())@Override@SuppressWarnings("unchecked")public final  Stream map(Function mapper) {    
Objects.requireNonNull(mapper);    return new StatelessOp(this, StreamShape.REFERENCE,                                 
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {        @Override        Sink opWrapSink(int flags, Sink sink) {            return new Sink.ChainedReference(sink) {                @Override                public void accept(P_OUT u) {                    downstream.accept(mapper.apply(u));                }            };        }    };}code_4(AbstractPipeline.AbstractPipeline())AbstractPipeline(AbstractPipeline previousStage, int opFlags) {    if (previousStage.linkedOrConsumed)        throw new IllegalStateException(MSG_STREAM_LINKED);    previousStage.linkedOrConsumed = true;    previousStage.nextStage = this;     this.previousStage = previousStage;    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);    this.sourceStage = previousStage.sourceStage;    if (opIsStateful())        sourceStage.sourceAnyStateful = true;    this.depth = previousStage.depth + 1;}如何叠加在上一步已经在stage中记录了每一步操作,此时并没有执行。但是stage只是保存了当前的操作,并不能确定下一个stage需要何种操作,何种数据,其实JDK为此定义了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四个接口(如表1-2所示,摘自这里),其中中间操作的子类中包含一个指向下游sink的指针。表1-2现在转向code_3,可以看出,在satge链中,每一步都包含了opWrapSink()。当调用终结操作时,将会触发code_5从最后一个stage(终结操作产生的satge)开始,递归产生图1-4所示的结构。code_5(AbstractPipeline.wrapSink())@Override@SuppressWarnings("unchecked")final  Sink wrapSink(Sink sink) {    Objects.requireNonNull(sink);     for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);    }    return (Sink) sink;}



图1-4
如何执行
所有的操作已经形成了图1-4的结构,接下来就会触发code_6,此时结果就会产生对应的结果啦!

code_6(AbstractPipelie.copyInto())
@Overridefinal 
 void copyInto(Sink
 wrappedSink, Spliterator
 spliterator) {    Objects.requireNonNull(wrappedSink);     if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {        wrappedSink.begin(spliterator.getExactSizeIfKnown());        spliterator.forEachRemaining(wrappedSink);        wrappedSink.end();    }    else {        copyIntoWithCancel(wrappedSink, spliterator);    }}


并行原理
那么,Stream是如何并行执行的呢?其实产生stage链的过程和串行并没有区别,只是在最终执行时进行了相应的调整,我们将code_1改变为code_7
code_7

public static void main(String[] args) {    List
 list = Lists.newArrayList(            "bcd", "cde", "def", "abc");    List
 result = list.stream()            .parallel()            .filter(e -> e.length() >= 3)            //.map(e -> e.charAt(0))            //.peek(System.out :: println)            .sorted()            //.peek(e -> System.out.println("++++" + e))            .map(e -> String.valueOf(e))            .collect(Collectors.toList());    System.out.println("----------------------------");    System.out.println(result);}


那么最终产生的stage链与sink的结构如图1-5所示,因为此时stage链中有一个有状态操作(sorted()),也就是说在这里必须处理完所有元素才能进行下一步操作。那么此时无论是并行还是串行,此时都会产生两个sink链,也就是代表了两次迭代,才产生了最终结果。

图1-5
那么,究竟是如何并行的呢?其实当调用collect操作时会调用code_8,其中的evaluateParallel()如code_9所示。

code_8(AbstractPipeline.evaluate())
final 
 R evaluate(TerminalOp
 terminalOp) {    assert getOutputShape() == terminalOp.inputShape();    if (linkedOrConsumed)        throw new IllegalStateException(MSG_STREAM_LINKED);    linkedOrConsumed = true;     return isParallel()            terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
code_9(ReduceOp.evaluateParallel())
@Override    public 
 R evaluateParallel(PipelineHelper
 helper,                                     Spliterator
 spliterator) {        return new ReduceTask<>(this, helper, spliterator).invoke().get();    }


其实Stream的并行处理是基于ForkJoin框架的,相关类与接口的结构如图1-6所示。其中AbstractShortCircuitTask用于处理短路操作,其他相关操作类似,会产生对应的Task。

图1-6
关于code_8中获取源Spliterator,如code_10所示,

code_10(AbstractPipeline.sourceSpliterator())
@SuppressWarnings("unchecked")private Spliterator sourceSpliterator(int terminalFlags) {    Spliterator spliterator = null;    if (sourceStage.sourceSpliterator != null) {        spliterator = sourceStage.sourceSpliterator;        sourceStage.sourceSpliterator = null;    }    else if (sourceStage.sourceSupplier != null) {        spliterator = (Spliterator) sourceStage.sourceSupplier.get();        sourceStage.sourceSupplier = null;    }    else {        throw new IllegalStateException(MSG_CONSUMED);    }     if (isParallel() && sourceStage.sourceAnyStateful) {        //如果是并行流并且有stage包含stateful操作        //那么就会依次遍历stage,直到遇到stateful stage时        int depth = 1;        for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;             u != e;             u = p, p = p.nextStage) {             int thisOpFlags = p.sourceOrOpFlags;            if (p.opIsStateful()) {                depth = 0;                 if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {                    //如果有短路操作,则去除相应标记                    thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;                }                //尽量以惰性求值的方式进行操作                spliterator = p.opEvaluateParallelLazy(u, spliterator);                 thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)                         (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED                        : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;            }            p.depth = depth++;            p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);        }    }     if (terminalFlags != 0)  {        // Apply flags from the terminal operation to last pipeline stage        combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);    }     return spliterator;}


如何并行执行
关于各个task就行是如何并行执行,其实最终调用的是code_11所示,对应的流程如图1-7所示,其中交替fork子节点是为了缓和数据分片不均造成的性能退化。
code_11(AbstractTask.compute())

@Overridepublic void compute() {    Spliterator
 rs = spliterator, ls; // right, left spliterators    long sizeEstimate = rs.estimateSize();    long sizeThreshold = getTargetSize(sizeEstimate);    boolean forkRight = false;    @SuppressWarnings("unchecked") K task = (K) this;    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {        K leftChild, rightChild, taskToFork;        task.leftChild  = leftChild = task.makeChild(ls);        task.rightChild = rightChild = task.makeChild(rs);        task.setPendingCount(1);        if (forkRight) {            forkRight = false;            rs = ls;            task = leftChild;            taskToFork = rightChild;        }        else {            forkRight = true;            task = rightChild;            taskToFork = leftChild;        }        taskToFork.fork();        sizeEstimate = rs.estimateSize();    }    task.setLocalResult(task.doLeaf());    task.tryComplete();}



图1-7
影响并行流的因素
数据大小;源数据结构(分割越容易越好),arraylist、数组比较好,hashSet、treeSet次之,linked最差;装箱;核的数量(可使用);单元处理开销(越大越好)
建议:
终结操作以外的操作,尽量避免副作用,避免突变基于堆栈的引用,或者在执行过程中进行任何I/O;传递给流操作的数据源应该是互不干扰(避免修改数据源)。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程