跳过正文
  1. 博客/
  2. 后端/
  3. 框架/

Stream源码(2):从问题出发看源码

4 分钟· ·
后端 框架 Java Stream
作者
Allen
一个强大、轻量级的 Hugo 主题。
目录

之前看一些开源项目源码的时候,发现一个问题,假如你贪全,一口气把整个代码看完,由于现在程序架构
比较复杂,很多功能被分成很多个组件来完成,有的时候你会被程序跳来跳去给弄晕,假如你但看一个小功能,你又
不知道为啥要用这个

所以这次准备尝试从问题入手,首先给自己提一些
问题,然后在从源代码中寻找答案,
在寻找答案的过程中会遇到更多问题,就这样打破砂锅问到底,最终没有问题了,这个时候你就差不多看懂了

0x00 问题
#

  1. distinct 操作过程中是否会将新加入的元素和历史元素一一比较?

为啥会有这个问题呢,因为在看源码

    Returns a stream consisting of the distinct elements (according to Object.equals(Object)) of this stream.
  
    For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.
  

这句话意思是会依靠`Object.equals(Object)` 来去重,我们知道`distinct`和`filter`都是中间操作
难道distinct会将每个元素和历史元素做一个Object.equals调用吗

假如这样做的话,那么这个操作就是O(n^2)的时间复杂度了,显然不太靠谱,我们查看distinct源码发现

最终distinct生成了一个StatefulOp ,而且这个类存在一个reduce函数,其中声明了一个

    TerminalOp<T, LinkedHashSet<T>> reduceOp
  
                    = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
  
                                                             LinkedHashSet::addAll);
  

我们可以看到这个变量是一个终止操作,其中使用LinkedHashSet来进行聚合,所以看到这我们就猜测
Stream没有这么傻,它声明了一个LinkedHashSet来存贮历史元素,这样只需要将加进来的元素进行
哈希计算,然后跟哈希碰撞的调用一下Object.equals(Object)函数就好了

我们看到这篇博文的实验也证明我们的猜测了`

接下来我们又有一个疑惑Stream内部是如何实现的呢?

0x01 猜测
#

我们首先从最小的代码来看,我们首先来看一个无状态的stream函数

    Stream.of(1L, 2L, 3L, 4L).forEach(System.out::println)
  

我们使用for循环用来实现也非常简单

           for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
            System.out.println(l);
  
          }
  

接下来我们来思考,如何实现一个有状态的Stream

     Stream.of(1L, 2L, 3L, 4L).reduce(0L, Long::sum);
  

我们如何用for循环来实现呢,很简单,定义一个变量

    long begin = 0;
  
    for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
        begin += l;
  
    }
  

我们能很容易写出一层for循环,但是Stream强大的地方在于,他可以穿插很多函数处理
比如:

    Stream.of(1L, 2L, 3L, 4L, 4L, 5L, 5L).distinct().filter(x -> x > 2).reduce(0, Long::sum);
  

我们简单的穿插了distinctfilter操作,我们接下来尝试使用for循环来实现上面的Stream

首先我们知道distinct 需要一个Set来过滤已经存在的,其中reduce需要一个初始量,那就好做了

    long start = 0L;
  
    for (long l : new long[]{1L, 2L, 3L, 4L}) {
  
        if(!set.contains(l)) {
  
            set.add(l);
  
            if(l > 2) {
  
                start += l;
  
            }
  
        }
  
    }
  

00x02 源码探究
#

接下来我们看看Stream内部如何实现这个for循环的,我们可以看到,其实.distinct().filter(x -> x > 2).reduce(0, Long::sum)对于每一层我们都需要能
创建一个Sink,对于这个for循环来说,都是把每个数据,我们把数据从一个sink到其他的sink

所有的Sink都实现了Consumer 接口,其中最重要的接口就是

    void accept(T t);
  

这个消费接口,我们可以理解“吃”数据,它会把我们传给它的值都“消化”掉

当我们在创建.distinct().filter(...)...这些stream的时候,我们做了什么呢,
我们每进行一次中间操作,我们都新建了一个流,其中我们通过upstream 这个变量指向
之前的流

当我们碰到终止操作比如reduce的时候,我们会进行一个回溯,把所有upstream都进行回溯,反过来把一个sink组装起来(每个sink指向它的上游)

// java.util.stream.AbstractPipeline.class
  
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
  
    Objects.requireNonNull(sink);
  
    // 回溯之前的stream流,创建sink,并让当前的sink指向上流
  
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
  
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
  
    }
  
    return (Sink<P_IN>) sink;
  
}
  

最终我们得到了最上流的sink,以上面为例就是distinct那个sink

接下来我们执行for循环,其中最重要的就是

    // java.util.stream.AbstractPipeline.copyInto 函数
  
    // 执行for循环 其中 传入的sink就是 我们上面得到的像葫芦串一样的sink
  
    spliterator.forEachRemaining(wrappedSink)
  

我们只需要给wrappedSink传入for循环的值就好了,由于每个sink都有其上游的引用,比如说distinct的sink,
他会判断是否已经存贮在sink中,如果没有就往上游传,由于上游也是个sink,所以最终如果不传了或者到最上游了就继续下一for循环的值

0x03
#

总结,我们这边非常浅显的把源代码给介绍了一下,其实要想吃透最好使用debug功能,一行一行代码进行debug,这样就能印象更深刻

## 引用

相关文章

Stream源码(1):如何实现去重
3 分钟
后端 框架 Java Stream
Dubbo浅探
3 分钟
后端 框架 Java Dubbo
Spring Cloud Alibaba浅探
2 分钟
后端 框架 Java SpringBoot
SpringCloud浅析
5 分钟
后端 框架 Java SpringBoot
浅析Spring
3 分钟
后端 框架 Java SpringBoot
浅析微服务
5 分钟
后端 框架 Java