本文翻译自 Parallelizing Streams,版权归原作者所有。
优化流计算
Stream API 的一个非常令人兴奋的特性是流能够并行处理数据。使用 Stream API 并行处理数据非常简单,只需在任何现有流上调用 parallel() 方法即可。
int parallelSum = IntStream.range(0, 10)
.parallel()
.sum();
IO.println("Sum = " + parallelSum);
运行此代码会得到以下结果。
Sum = 45
这个求和实际上是并行计算的。不过在这么小的示例中,你可能不会注意到任何性能提升。
为什么要并行计算数据?可能是为了更快地得到计算结果。并行流会比顺序流更快地给出结果吗?这个问题的答案并不像听起来那么简单。在某些情况下是的,但在其他一些情况下,不幸的是,不会。尽管听起来令人失望,但并行流并不总是比顺序流快。
考虑到这一点,你应该谨慎:选择使用并行流不是一个可以轻易做出的决定。在考虑并行化之前,你需要问自己几个问题。
首先,问问自己,你需要它吗?你的应用程序中是否有未满足的性能要求?你确定性能问题来自你正在考虑并行计算的流处理吗?你计划如何衡量性能提升,以确保对这个特定计算并行化确实改善了应用程序的性能?
并行化会消耗更多的计算能力。你是否有空闲的 CPU 或 CPU 核心可以分配给这个计算?你能否在不减慢应用程序其余部分的情况下为计算分配更多 CPU 周期?
并行化会消耗线程。你是否有空闲线程可以分配给计算?如果你在 Web 服务器中运行的应用程序上工作,那么你的线程用于处理 HTTP 请求。你愿意将它们用于其他事情吗?
一旦你选择了并行化,那么你需要确保流计算的性能确实得到了改善。你应该在尽可能接近生产环境的上下文中测量这种性能提升。
在本教程中,我们涵盖了几个关键要素,这些要素将帮助你评估并行化可能带来的收益,以及一些应该让你对并行化保持警惕的其他要素。但归根结底,唯一能告诉你并行化是否值得的是测试和测量执行时间。
并行化实现
Stream API 中的并行化是通过递归分解流正在处理的数据来实现的。它构建在 JDK 7 中添加的 Fork/Join 框架之上。
分解包括将流正在处理的数据分成两部分。然后每个部分由自己的 CPU 核心处理,该核心可能决定再次递归分割它。
在某个时刻,框架将决定给定部分中的数据量足够小,可以正常处理。然后将处理这个数据子集,并计算部分结果。然后将这个部分结果与其他 CPU 核心上从其他部分计算的其他部分结果合并。
并行化确实会带来开销。与在多个 CPU 核心上分配计算的收益相比,这种开销必须很小。否则,并行化将恶化计算性能而不是改善它们。
让我们逐一检查所有这些步骤,看看什么可能阻止你获得更好的性能提升。
理解数据局部性
数据局部性会影响数据处理的速度,无论是顺序处理还是并行处理。局部性越好,计算就越快。
为了让 CPU 可用,你的数据必须从计算机的主内存传输到 CPU 的缓存。从物理上讲,主内存是计算机的一个特定组件,与 CPU 分离。另一方面,缓存与 CPU 的核心计算元素共享同一硅片。它们通过主板和不同的通信总线连接在一起。与 CPU 核心从其缓存访问数据的速度相比,将数据从主内存传输到 CPU 缓存非常慢。
当 CPU 需要某些数据时,它首先检查这些数据是否在其缓存中可用。如果可用,则可以立即使用。如果不可用,则必须从主内存中获取这些数据并复制到缓存。这种情况称为缓存未命中。缓存未命中的代价很高,因为在此期间 CPU 正在等待数据。你希望避免这种情况。
数据在主内存和 CPU 缓存之间传输的方式在避免缓存未命中方面起着重要作用。内存按行组织。通常,一行长 64 字节,即八个 long 值(这可能因 CPU 而异)。主内存和 CPU 缓存之间的所有传输都是逐行进行的。因此,即使你的 CPU 只需要一个 int 值,包含该值的整行也会传输到缓存。
迭代基本类型数组
假设你的代码正在迭代一个 int[] 类型的数组。64 字节的一行可以容纳 16 个 int 值。假设访问数组的第一个元素是缓存未命中。然后 CPU 将把包含此元素的行加载到其缓存中以开始迭代。因为它加载了完整的一行,接下来的 15 个值可能也已被传输。访问下一个值将非常快。
在这种情况下,数据局部性非常好:你的数据物理上存储在主内存的连续区域中。这是可取的,因为将数据从主内存传输到 CPU 缓存会快得多。
迭代 Integer 实例数组
现在假设你的代码正在迭代一个 Integer[] 类型的数组。你真正拥有的不再是基本类型数组,而是引用数组。此数组的每个单元格都包含对 Integer 类型对象的引用,该对象可以在内存中的任何位置。
如果对数组第一个元素的访问是缓存未命中,那么 CPU 将必须将包含此元素的行加载到其缓存中。它真正加载的是数组的前 16 个引用,假设第一个引用在行的开头。然后它必须加载第一个 Integer 对象,该对象可能在主内存中的其他地方,导致另一个缓存未命中。实际上,读取数组的每个 Integer 对象很可能也会导致缓存未命中。
在这种情况下,数据局部性不如前一个示例好:对数据的引用物理上存储在主内存的连续区域中,但你进行计算所需的值不是。这是不可取的,因为将你需要的值从主内存传输到 CPU 缓存比基本类型数组的情况慢得多。
迭代 Integer 实例的链表
让我们检查最后一种情况。现在假设你的代码正在迭代一个 LinkedList<Integer> 类型的列表。如果对第一个元素的访问是缓存未命中,那么 CPU 将把链表的第一个节点加载到其缓存中。该节点包含两个引用:第一个引用指向计算所需的值,第二个引用指向列表的下一个节点。这种情况比前一种更糟:访问列表的下一个值很可能会产生两次缓存未命中。
在这种情况下,数据局部性很糟糕:你的数据和对它们的引用都没有存储在主内存的连续区域中。访问你需要的元素将比我们检查的第一种情况慢得多。
避免指针追踪
必须跟随引用或指针来访问携带你需要的数据的正确元素称为指针追踪。指针追踪是你希望在应用程序中避免的事情,是许多性能问题的根源。迭代 int 值数组时不存在指针追踪。迭代 Integer 实例的链表时,它构成了主要的性能问题。
拆分数据源
如果你决定并行处理流,第一步将包括拆分数据源。为了使拆分高效,它应该具有几个属性。
- 拆分数据结构应该简单快速。
- 拆分应该均匀:你得到的两个子流应该有相同数量的数据要处理。
拆分 Collection 实例
ArrayList 是一个完美的拆分数据结构。你可以轻松获得中间元素,如果你按中间拆分数组,你确切地知道两个子数组中将有多少元素。
另一方面,LinkedList 不是一个好的拆分结构。到达中间元素需要逐个遍历列表的一半元素,这由于指针追踪而代价高昂。一旦到达那里,你可以获得具有正确元素数量的两个子列表。
HashSet 建立在桶数组之上,因此拆分此数组与拆分数组列表的内部数组相同。但数据在此数组中的存储方式不同。更难以以保证两个部分中元素数量相同的方式拆分此数组。你甚至可能最终得到一个空的子部分。
TreeSet 基于红黑树实现。它保证所有节点在其左右子节点上具有相同数量的元素。因此,将 TreeSet 实例拆分为两个均匀的子树很容易。不过,你仍然需要追踪指针来访问数据。
所有这些结构都在集合框架中使用,你可以获得它们各自携带的元素数量。
对于可以从中创建流的所有结构,情况并非如此。
拆分文本文件的行
Files.lines(path) 模式就是这种情况,它在本教程前面已经介绍过。它创建一个处理此 path 对象表示的文本文件行的流。在不分析文本文件的情况下,无法获得文本文件的行数。
Pattern.splitAsStream(line) 模式也是如此,我们也介绍过。它使用提供的模式从 line 的拆分创建流。同样,你无法预先知道在这样的流中将处理多少元素。
拆分范围或生成的流
数字的特化流也为你提供了创建流的模式。
IntStream.range(0, 10) 流很容易拆分。实际上,它看起来像一个可以按中间拆分的数字数组。每个部分中的元素数量是可预测的,这是可取的。
另一方面,Stream.generate() 和 Stream.iterate() 方法不会给你一个易于拆分的数据源。实际上,此源可能是无限的,仅受流中处理方式的限制。
让我们比较以下两种模式。
List<Integer> list1 = IntStream.range(0, 10).boxed()
.toList();
List<Integer> list2 = IntStream.iterate(0, i -> i + 1)
.limit(10).boxed()
.toList();
两个列表 list1 和 list2 相同,使用不同的模式创建。第一个很容易拆分,而第二个不是。主要原因是,在第二种模式中,知道第五个元素的值需要计算所有先前的元素。从这个意义上说,第二种模式看起来像一个链表,你需要访问前四个元素才能到达第五个元素。
拆分和分派工作
一旦你的数据源被拆分,那么两个子流必须在 CPU 的不同核心上处理,并行化才能有效。
这是由 Fork/Join 框架完成的。Fork/Join 框架处理一个线程池,在应用程序启动时创建,称为公共 Fork/Join 池。此池中的线程数与 CPU 的核心数对齐。此池中的每个线程都有一个等待队列,线程可以在其中存储任务。
- 池的第一个线程创建第一个任务。此任务的执行决定计算是否足够小可以顺序计算,还是太大应该拆分。
- 如果拆分,则创建两个子任务并存储在该线程的队列中。然后主任务等待两个子任务完成。在等待时,它也存储在此等待队列中。
- 如果进行计算,则产生结果。此结果是整个计算的部分结果。然后此任务将结果返回给创建它的主任务。
- 一旦任务获得了它创建的两个子任务的两个结果,它就可以合并它们以产生结果并将其返回给创建它的主任务。
在某个时刻,第一个主任务从其两个子任务获得两个部分结果。然后它能够合并它们并返回计算的最终结果。
到目前为止,唯一工作的线程是池的第一个线程,它由 Fork/Join 框架调用。Fork/Join 框架实现了另一种并发编程模式,称为工作窃取。池的空闲线程可以检查同一池的其他线程的等待队列以获取任务并处理它。
这就是这种情况下发生的事情。一旦第一个等待队列中的任务数量增加,其他线程就会窃取其中一些,处理它们,进一步拆分工作,并用更多任务填充它们自己的等待队列。此功能使池的所有线程保持忙碌。
这种工作窃取功能效果很好,但它有一个缺点:根据源的拆分方式以及任务如何从一个线程移动到另一个线程,你的数据可能以任何顺序处理。在某些情况下,这可能是一个问题。
处理子流
处理子流可能与处理完整流不同。两个元素可以使子流的处理不同:访问外部状态,以及从一个元素的处理到另一个元素携带状态。这两个元素将影响并行流的性能。
访问外部状态
Fork/Join 框架将你的计算拆分为许多子任务,每个子任务由池中的线程处理。
如果你顺序处理流,所有元素都在运行方法的线程中处理。如果你并行处理同一流,元素由公共 Fork/Join 池中的线程处理。
访问流外部的状态然后从另一个线程进行,可能导致竞态条件。
让我们考虑以下代码。不幸的是,你无法在浏览器中运行此代码,你需要将其复制并粘贴到 IDE 中以查看它是如何工作的。
Set<String> threadNames =
IntStream.range(0, 100)
// .parallel()
.mapToObj(index -> Thread.currentThread().getName())
.collect(Collectors.toSet());
IO.println("Thread names:");
threadNames.forEach(IO::println);
它产生的结果如下。
Thread names:
main
如果你取消注释 parallel() 调用,那么此流将并行执行。结果变为以下内容,在你自己的机器上可能会有所不同。
Thread names:
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-4
main
ForkJoinPool.commonPool-worker-5
对非并发外部元素的任何访问都可能导致竞态条件和数据不一致。让我们运行以下代码。
List<Integer> ints = new ArrayList<>();
IntStream.range(0, 1_000_000)
.parallel()
.forEach(ints::add);
IO.println("ints.size() = " + ints.size());
多次运行此代码可能会导致不同的结果,因为公共 Fork/Join 池的所有线程都试图在 ArrayList 实例中并发添加数据,而 ArrayList 不是线程安全的结构。看到正确结果的机会很小,你甚至可能得到 ArrayIndexOutOfBoundsException。使用任何非并发集合或映射运行此类代码会导致不可预测的结果,包括异常。
典型的运行会给你类似这样的结果。是的,这缺失了很多元素!
ints.size() = 387122
流修改其外部状态是一种反模式。
遇到顺序
在某些情况下,Stream API 中处理数据的顺序很重要。以下方法就是这种情况。
limit(n):将处理限制为此流的前n个元素。skip(n):跳过此流的前n个元素的处理。findFirst():查找流的第一个元素。
这三个方法需要记住流元素的处理顺序,并需要对元素进行计数以产生正确的结果。
它们被称为有状态操作,因为它们需要携带内部状态才能工作。
在并行流中,这种有状态操作会导致开销。例如,limit() 需要一个内部计数器才能正确工作。在并行情况下,这个内部计数器在不同线程之间共享。在线程之间共享可变状态代价高昂,应该避免。
理解并行计算流的开销
并行计算流会增加一些处理并行性的计算。这些元素有成本,你需要知道它们,以确保与并行化的好处相比,这种成本不会太高。
- 你的数据需要拆分。拆分可以便宜,也可以昂贵,这取决于你处理的数据。数据的糟糕局部性会使拆分变得昂贵。
- 拆分需要高效。它需要创建均匀拆分的子流。有些源可以轻松均匀拆分,有些则不能。
- 一旦拆分,实现将并发处理你的数据。你应该避免访问任何外部可变状态,也应避免具有内部共享可变状态。
- 然后必须合并部分结果。有些结果可以轻松合并。合并整数和很容易且便宜。合并集合也很容易。合并哈希映射更昂贵。
陈述正确使用并行流的一些规则
规则 1:不要因为有趣而优化;因为你有要求而你没有满足它们才优化。
规则 2:谨慎选择数据源。
规则 3:不要修改外部状态,也不要共享可变状态。
规则 4:不要猜测;测量代码的性能。