如何将普通的Thread多线程改为Java8的parallelStream并发流
这篇文章将为大家详细讲解有关如何将普通的Thread多线程改为Java8的parallelStream并发流,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
概括总结
Java8的parallelStream
并发流能达到跟多线程类似的效果,但它也不是什么善茬,为了得到跟上一版本的多线程类似的效果,一改再改,虽然最后改出来了,但是还是存在理解不了的地方。
怎样得到一个parallelStream
理论上,你需要先有一个List>
,任意类型的List都行,然后调用它的.parallelStream()
方法就可以了。
对我这个例子来说,元素的类型不重要,因此选择了Integer
类型,核心代码如下:
AtomicInteger atomicInteger = new AtomicInteger(0);return Arrays.asList(new Integer[size]).parallelStream().map(i -> atomicInteger.incrementAndGet());
值得注意的是,第一行用的是AtomicInteger
而不是Integer
,因为Integer
会存在并发问题
第二行的意思是:新建一个大小为size
的数组,把数组转成List
,再把List转成parallelStream
,再把列表中的元素初始化成递增的整数,最后返回。
为什么说parallelStream不是什么善茬
简单来说,我认为原因是:因为它的默认值适用的场景是CPU密集型
的,而一般的Web项目是IO密集型
的(一般的Web项目都是需要跟数据库打交道的,针对数据库的操作主要就都是IO
,而对CPU的消耗并不高)。
当不能使用默认值的时候,就需要开发人员额外去了解parallelStream
的用法,而这些资料还不是特别好找。比如说:parallelStream
默认的并发线程数是多少?怎么修改默认的线程数?
我最终找到了这篇问答:Custom thread pool in Java 8 parallel stream,供参考。
问题1: 在Java代码中,怎样获取可用的CPU处理器的数量?代码如下:(在我的机器8核上结果是:8)
Runtime.getRuntime().availableProcessors()
问题2: parallelStream默认的并发线程数是多少?代码如下:(在我的机器上结果是:7)
ForkJoinPool.getCommonPoolParallelism()
问题3: 为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?
因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。
问题4: 那如果电脑比较差,就只有1个CPU要怎么办?那就不管了,默认的并发线程数就是1,总不能为零吧。
问题5: 默认的并发线程数太少了,要怎么修改?如代码如下:(改成了20)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
问题6: 默认的并发线程数可以反复修改吗?不能。因为java.util.concurrent.ForkJoinPool.common.parallelism
是final
类型的,整个JVM中只允许设置一次。
执行以下代码:
int a=5;for (;;) { System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + a++); System.out.println("ForkJoinPool.getCommonPoolParallelism() : " + ForkJoinPool.getCommonPoolParallelism()); if(a>7)break;}/**result:ForkJoinPool.getCommonPoolParallelism() : 5ForkJoinPool.getCommonPoolParallelism() : 5ForkJoinPool.getCommonPoolParallelism() : 5*/
问题7: 既然默认的并发线程数不能反复修改,那怎么进行不同线程数量的并发测试呢?答案是:引入ForkJoinPool
。用法如下:
new ForkJoinPool(threadCount).submit(() -> { parallelStream.forEach(i -> { // 这里省略提交订单的代码 });}).get();
问题8: java.util.concurrent.ForkJoinPool.common.parallelism
与new ForkJoinPool(threadCount)
之间有什么关系?答案是:不知道。
这个答案很让人失望,但是我确实没有查出来。我这边测试的结果是:
1.如果在new ForkJoinPool(threadCount)
之前没有设置java.util.concurrent.ForkJoinPool.common.parallelism
的值,那么new ForkJoinPool(threadCount)
的作用就不明显,即就是说,改变threadCount
的值对性能没有多大影响。
2.如果在之前设置了java.util.concurrent.ForkJoinPool.common.parallelism
的值,但是设置得比较小(比如32),则后续的new ForkJoinPool(threadCount)
的作用也不明显。
3.只有先把java.util.concurrent.ForkJoinPool.common.parallelism
的值设置得比较大(比如10000),后续的new ForkJoinPool(threadCount)
中threadCount
改变之后,才对性能有明显的影响。
问题9: 如果按问题8中的来修改,把java.util.concurrent.ForkJoinPool.common.parallelism
的值设置得比较大(比如10000),就意味不再适用于CPU密集型
的操作了,那应该怎么办呢?
答案是:每次都用new ForkJoinPool(threadCount)
,整体放弃使用默认的parallelStream
。(那多麻烦啊
你看,随随便便就有这么多问题,顿时就不想用了,是不是。
并发线程数量"与"每秒能提交的订单数量"之间的关系
这次测试的结果与上次测试的结果对比图如下:(红色为上一版本使用Thread测试的结果,黑色为这一版本使用parallelStream测试的结果)
可以看到差别并不大。
【备注】:不同的机器上的测试结果会不一样,以上测试结果仅供参考。
关于如何将普通的Thread多线程改为Java8的parallelStream并发流就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。