千家信息网

怎么理解Java 8并行流

发表于:2025-02-14 作者:千家信息网编辑
千家信息网最后更新 2025年02月14日,本篇内容主要讲解"怎么理解Java 8并行流",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"怎么理解Java 8并行流"吧!并行流认识和开启并行流什么是并行
千家信息网最后更新 2025年02月14日怎么理解Java 8并行流

本篇内容主要讲解"怎么理解Java 8并行流",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"怎么理解Java 8并行流"吧!

并行流

认识和开启并行流

什么是并行流: 并行流就是将一个流的内容分成多个数据块,并用不同的线程分别处理每个不同数据块的流。例如有这么一个需求:

有一个 List 集合,而 list 中每个 apple 对象只有重量,我们也知道 apple 的单价是 5元/kg,现在需要计算出每个 apple 的单价,传统的方式是这样:

List appleList = new ArrayList<>(); // 假装数据是从库里查出来的  for (Apple apple : appleList) {      apple.setPrice(5.0 * apple.getWeight() / 1000);  }

我们通过迭代器遍历 list 中的 apple 对象,完成了每个 apple 价格的计算。而这个算法的时间复杂度是 O(list.size()) 随着 list 大小的增加,耗时也会跟着线性增加。并行流可以大大缩短这个时间。

并行流处理该集合的方法如下:

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));

和普通流的区别是这里调用的 parallelStream() 方法。当然也可以通过 stream.parallel() 将普通流转换成并行流。推荐看下:Java 8 创建 Stream 的 10 种方式,更多可以关注Java技术栈公众号回复java获取系列教程。

并行流也能通过 sequential() 方法转换为顺序流,但要注意:流的并行和顺序转换不会对流本身做任何实际的变化,仅仅是打了个标记而已。并且在一条流水线上对流进行多次并行 / 顺序的转换,生效的是最后一次的方法调用

并行流如此方便,它的线程从那里来呢?有多少个?怎么配置呢?

并行流内部使用了默认的 ForkJoinPool 线程池。默认的线程数量就是处理器的核心数,而配置系统核心属性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改变线程池大小。不过该值是全局变量。

改变他会影响所有并行流。目前还无法为每个流配置专属的线程数。一般来说采用处理器核心数是不错的选择

测试并行流的性能

为了更容易的测试性能,我们在每次计算完苹果价格后,让线程睡 1s,表示在这期间执行了其他 IO 相关的操作,并输出程序执行耗时,顺序执行的耗时:

public static void main(String[] args) throws InterruptedException {      List appleList = initAppleList();      Date begin = new Date();      for (Apple apple : appleList) {          apple.setPrice(5.0 * apple.getWeight() / 1000);          Thread.sleep(1000);      }      Date end = new Date();      log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);  }

并行版本

List appleList = initAppleList();  Date begin = new Date();  appleList.parallelStream()  .forEach(apple ->           {               apple.setPrice(5.0 * apple.getWeight() / 1000);               try {                   Thread.sleep(1000);               } catch (InterruptedException e) {                   e.printStackTrace();               }           }          );  Date end = new Date();  log.info("苹果数量:{}个, 耗时:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);

耗时情况

跟我们的预测一致,我的电脑是 四核I5 处理器,开启并行后四个处理器每人执行一个线程,最后 1s 完成了任务!

并行流可以随便用吗?

可拆分性影响流的速度

通过上面的测试,有的人会轻易得到一个结论:并行流很快,我们可以完全放弃 foreach/fori/iter 外部迭代,使用 Stream 提供的内部迭代来实现了。

事实真的是这样吗?并行流真的如此完美吗?答案当然是否定的。大家可以复制下面的代码,在自己的电脑上测试。测试完后可以发现,并行流并不总是最快的处理方式。

  1. 鸿蒙官方战略合作共建--HarmonyOS技术社区

  2. 对于 iterate 方法来处理的前 n 个数字来说,不管并行与否,它总是慢于循环的,非并行版本可以理解为流化操作没有循环更偏向底层导致的慢。可并行版本是为什么慢呢?这里有两个需要注意的点:

2. iterate 生成的是装箱的对象,必须拆箱成数字才能求和

3. 我们很难把 iterate 分成多个独立的块来并行执行

这个问题很有意思,我们必须意识到某些流操作比其他操作更容易并行化。对于 iterate 来说,每次应用这个函数都要依赖于前一次应用的结果。因此在这种情况下,我们不仅不能有效的将流划分成小块处理。反而还因为并行化再次增加了开支。

4. 而对于 LongStream.rangeClosed() 方法来说,就不存在 iterate 的第两个痛点了。它生成的是基本类型的值,不用拆装箱操作,另外它可以直接将要生成的数字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 这样四部分。因此并行状态下的 rangeClosed() 是快于 for 循环外部迭代的

package lambdasinaction.chap7;  import java.util.stream.*;  public class ParallelStreams {      public static long iterativeSum(long n) {          long result = 0;          for (long i = 0; i <= n; i++) {              result += i;          }          return result;      }      public static long sequentialSum(long n) {          return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();      }      public static long parallelSum(long n) {          return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();      }      public static long rangedSum(long n) {          return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();      }      public static long parallelRangedSum(long n) {          return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();      }  }  package lambdasinaction.chap7;  import java.util.concurrent.*;  import java.util.function.*;  public class ParallelStreamsHarness {      public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();      public static void main(String[] args) {          System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");          System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");          System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );          System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");          System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );      }      public static  long measurePerf(Function f, T input) {          long fastest = Long.MAX_VALUE;          for (int i = 0; i < 10; i++) {              long start = System.nanoTime();              R result = f.apply(input);              long duration = (System.nanoTime() - start) / 1_000_000;              System.out.println("Result: " + result);              if (duration < fastest) fastest = duration;          }          return fastest;      }  }

共享变量修改的问题

并行流虽然轻易的实现了多线程,但是仍未解决多线程中共享变量的修改问题。下面代码中存在共享变量 total,分别使用顺序流和并行流计算前n个自然数的和

public static long sideEffectSum(long n) {      Accumulator accumulator = new Accumulator();      LongStream.rangeClosed(1, n).forEach(accumulator::add);      return accumulator.total;  }  public static long sideEffectParallelSum(long n) {      Accumulator accumulator = new Accumulator();      LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);      return accumulator.total;  }  public static class Accumulator {      private long total = 0;      public void add(long value) {          total += value;      }  }

顺序执行每次输出的结果都是:50000005000000,而并行执行的结果却五花八门了。这是因为每次访问 totle 都会存在数据竞争,关于数据竞争的原因,大家可以看看关于 volatile 的博客。因此当代码中存在修改共享变量的操作时,是不建议使用并行流的。

并行流的使用注意

在并行流的使用上有下面几点需要注意:

  • 尽量使用 LongStream / IntStream / DoubleStream 等原始数据流代替 Stream 来处理数字,以避免频繁拆装箱带来的额外开销

  • 要考虑流的操作流水线的总计算成本,假设 N 是要操作的任务总数,Q 是每次操作的时间。N * Q 就是操作的总时间,Q 值越大就意味着使用并行流带来收益的可能性越大

例如:前端传来几种类型的资源,需要存储到数据库。每种资源对应不同的表。我们可以视作类型数为 N,存储数据库的网络耗时 + 插入操作耗时为 Q。一般情况下网络耗时都是比较大的。因此该操作就比较适合并行处理。当然当类型数目大于核心数时,该操作的性能提升就会打一定的折扣了。更好的优化方法在日后的博客会为大家奉上

  • 对于较少的数据量,不建议使用并行流

  • 容易拆分成块的流数据,建议使用并行流

以下是一些常见的集合框架对应流的可拆分性能表:

可拆分性
ArrayList极佳
LinkedList
IntStream.range极佳
Stream.iterate
HashSet
TreeSet

到此,相信大家对"怎么理解Java 8并行流"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

数据 线程 处理 方法 顺序 变量 测试 处理器 性能 数字 时间 核心 类型 迭代 不同 代码 内容 对象 就是 建议 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发过程改进之我见 辽宁知名软件开发价格 中国石化网络安全制度体系介绍 网络安全哪个部分重要 java软件开发内部 第五届强网杯网络安全大赛视频 服务器虚拟化缺点 网络安全法普法网专题图解一 程序员被黑客攻击服务器 未落实网络安全义务 网络安全技能大赛理论题库 app软件开发业务咨询 重庆惠普服务器虚拟化价格 联想服务器安装教程 销售商品数据库的建立 教育培训软件开发平台 关系数据库的行可以任意交换 将表复制到另一个数据库 互联网对科技商业的影响 数据库创建教师表的命令 中国陨石数据库1719 乐一途互联网科技有限公司 哈利波特好友不同服务器 朱鲁华网络安全 香港互联网科技公司排行榜 数据库结构冲突解决方案 北京软件开发游戏培训 保障网络安全申论范文 软件开发遇到的问题有哪些 长春大学网络安全学院怎么样
0