本文是一个有关 JVM 并发性的新系列第一篇,将介绍 Java 7 中最新的并发性编程功能,还将介绍一些 Scala 增强。本文还为帮助您理解 Java 8 中的并发性特性扫清了障碍。
处理器速度数十年来一直持续快速发展,并在世纪交替之际走到了终点。从那时起,处理器制造商更多地是通过增加核心来提高芯片性能,而不再通过增加时钟速率来提高芯片性能。多核系统现在成为了从手机到企业服务器等所有设备的标准,而这种趋势可能继续并有所加速。开发人员越来越需要在他们的应用程序代码中支持多个核心,这样才能满足性能需求。
在本系列文章中,您将了解一些针对 Java 和 Scala 语言的并发编程的新方法,包括 Java 如何将 Scala 和其他基于 JVM 的语言中已经探索出来的理念结合在一起。第一期文章将介绍一些背景,通过介绍 Java 7 和 Scala 的一些最新技术,帮助了解 JVM 上的并发编程的全景。您将了解如何使用 Java ExecutorService 和 ForkJoinPool 类来简化并发编程。还将了解一些将并发编程选项扩展到纯 Java 中的已有功能之外的基本 Scala 特性。在此过程中,您会看到不同的方法对并发编程性能有何影响。后续几期文章将会介绍 Java 8 中的并发性改进和一些扩展,包括用于执行可扩展的 Java 和 Scala 编程的 Akka 工具包。
Java 并发性支持
在 Java 平台诞生之初,并发性支持就是它的一个特性,线程和同步的实现为它提供了超越其他竞争语言的优势。Scala 基于 Java 并在 JVM 上运行,能够直接访问所有 Java 运行时(包括所有并发性支持)。所以在分析 Scala 特性之前,我首先会快速回顾一下 Java 语言已经提供的功能。
Java 线程基础
在 Java 编程过程中创建和使用线程非常容易。它们由 java.lang.Thread 类表示,线程要执行的代码为 java.lang.Runnable 实例的形式。如果需要的话,可以在应用程序中创建大量线程,您甚至可以创建数千个线程。在有多个核心时,JVM 使用它们来并发执行多个线程;超出核心数量的线程会共享这些核心。
线程操作的协调难以让人理解。只要从程序的角度让所有内容保持一致,Java 编译器和 JVM 就不会对您代码中的操作重新排序,这使得问题变得更加复杂。例如:如果两个相加操作使用了不同的变量,编译器或 JVM 可以安装与指定的顺序相反的顺序执行这些操作,只要程序不在两个操作都完成之前使用两个变量的总数。这种重新排序操作的灵活性有助于提高 Java 性能,但一致性只被允许应用在单个线程中。硬件也有可能带来线程问题。现代系统使用了多种缓存内存级别,一般来讲,不是系统中的所有核心都能同样看到这些缓存。当某个核心修改内存中的一个值时,其他核心可能不会立即看到此更改。
由于这些问题,在一个线程使用另一个线程修改的数据时,您必须显式地控制线程交互方式。Java 使用了特殊的操作来提供这种控制,在不同线程看到的数据视图中建立顺序。基本操作是,线程使用 synchronized 关键字来访问一个对象。当某个线程在一个对象上保持同步时,该线程将会获得此对象所独有的一个锁的独占访问。如果另一个线程已持有该锁,等待获取该锁的线程必须等待,或者被阻塞,直到该锁被释放。当该线程在一个 synchronized 代码块内恢复执行时,Java 会保证该线程可以 “看到了” 以前持有同一个锁的其他线程写入的所有数据,但只是这些线程通过离开自己的 synchronized 锁来释放该锁之前写入的数据。这种保证既适用于编译器或 JVM 所执行的操作的重新排序,也适用于硬件内存缓存。一个 synchronized 块的内部是您代码中的一个稳定性孤岛,其中的线程可依次安全地执行、交互和共享信息。
在变量上对 volatile 关键字的使用,为线程间的安全交互提供了一种稍微较弱的形式。synchronized 关键字可确保在您获取该锁时可以看到其他线程的存储,而且在您之后,获取该锁的其他线程也会看到您的存储。volatile 关键字将这一保证分解为两个不同的部分。如果一个线程向 volatile 变量写入数据,那么首先将会擦除它在这之前写入的数据。如果某个线程读取该变量,那么该线程不仅会看到写入该变量的值,还会看到写入的线程所写入的其他所有值。所以读取一个 volatile 变量会提供与输入 一个 synchronized 块相同的内存保证,而且写入一个 volatile 变量会提供与离开 一个 synchronized 块相同的内存保证。但二者之间有很大的差别:volatile 变量的读取或写入绝不会受阻塞。
抽象 Java 并发性
同步很有用,而且许多多线程应用程序都是在 Java 中仅使用基本的 synchronized 块开发出来的。但协调线程可能很麻烦,尤其是在处理许多线程和许多块的时候。确保线程仅在安全的方式下交互并 避免潜在的死锁(两个或更多线程等待对方释放锁之后才能继续执行),这很困难。支持并发性而不直接处理线程和锁的抽象,这为开发人员提供了处理常见用例的更好方法。
java.util.concurrent 分层结构包含一些集合变形,它们支持并发访问、针对原子操作的包装器类,以及同步原语。这些类中的许多都是为支持非阻塞访问而设计的,这避免了死锁的问题,而且实现了更高效的线程。这些类使得定义和控制线程之间的交互变得更容易,但他们仍然面临着基本线程模型的一些复杂性。
java.util.concurrent 包中的一对抽象,支持采用一种更加分离的方法来处理并发性:Future<T> 接口、Executor 和 ExecutorService 接口。这些相关的接口进而成为了对 Java 并发性支持的许多 Scala 和 Akka 扩展的基础,所以更详细地了解这些接口和它们的实现是值得的。
Future<T> 是一个 T 类型的值的持有者,但奇怪的是该值一般在创建 Future 之后才能使用。正确执行一个同步操作后,才会获得该值。收到 Future 的线程可调用方法来:
查看该值是否可用 等待该值变为可用 在该值可用时获取它 如果不再需要该值,则取消该操作
Future 的具体实现结构支持处理异步操作的不同方式。
Executor 是一种围绕某个执行任务的东西的抽象。这个 “东西” 最终将是一个线程,但该接口隐藏了该线程处理执行的细节。Executor 本身的适用性有限,ExecutorService 子接口提供了管理终止的扩展方法,并为任务的结果生成了 Future。Executor 的所有标准实现还会实现 ExecutorService,所以实际上,您可以忽略根接口。
线程是相对重量级的资源,而且与分配并丢弃它们相比,重用它们更有意义。ExecutorService 简化了线程间的工作共享,还支持自动重用线程,实现了更轻松的编程和更高的性能。ExecutorService 的 ThreadPoolExecutor 实现管理着一个执行任务的线程池。
应用 Java 并发性
并发性的实际应用常常涉及到需要与您的主要处理逻辑独立的外部交互的任务(与用户、存储或其他系统的交互)。这类应用很难浓缩为一个简单的示例,所以在演示并发性的时候,人们通常会使用简单的计算密集型任务,比如数学计算或排序。我将使用一个类似的示例。
任务是找到离一个未知的输入最近的已知单词,其中的最近 是按照Levenshtein 距离 来定义的:将输入转换为已知的单词所需的最少的字符增加、删除或更改次数。我使用的代码基于 Wikipedia 上的 Levenshtein 距离 文章中的一个示例,该示例计算了每个已知单词的 Levenshtein 距离,并返回最佳匹配值(或者如果多个已知的单词拥有相同的距离,那么返回结果是不确定的)。
清单 1 给出了计算 Levenshtein 距离的 Java 代码。该计算生成一个矩阵,将行和列与两个对比的文本的大小进行匹配,在每个维度上加 1。为了提高效率,此实现使用了一对大小与目标文本相同的数组来表示矩阵的连续行,将这些数组包装在每个循环中,因为我只需要上一行的值就可以计算下一行。
清单 1. Java 中的 Levenshtein 距离计算
/** * Calculate edit distance from targetText to known word. * * @param word known word * @param v0 int array of length targetText.length() + 1 * @param v1 int array of length targetText.length() + 1 * @return distance */private int editDistance(String word, int[] v0, int[] v1) { // initialize v0 (prior row of distances) as edit distance for empty 'word' for (int i = 0; i < v0.length; i++) { v0[i] = i; } // calculate updated v0 (current row distances) from the previous row v0 for (int i = 0; i < word.length(); i++) { // first element of v1 = delete (i+1) chars from target to match empty 'word' v1[0] = i + 1; // use formula to fill in the rest of the row for (int j = 0; j < targetText.length(); j++) { int cost = (word.charAt(i) == targetText.charAt(j)) ? 0 : 1; v1[j + 1] = minimum(v1[j] + 1, v0[j + 1] + 1, v0[j] + cost); } // swap v1 (current row) and v0 (previous row) for next iteration int[] hold = v0; v0 = v1; v1 = hold; } // return final value representing best edit distance return v0[targetText.length()];}
如果有大量已知词汇要与未知的输入进行比较,而且您在一个多核系统上运行,那么您可以使用并发性来加速处理:将已知单词的集合分解为多个块,将每个块作为一个独立任务来处理。通过更改每个块中的单词数量,您可以轻松地更改任务分解的粒度,从而了解它们对总体性能的影响。清单 2 给出了分块计算的 Java 代码,摘自 示例代码 中的 ThreadPoolDistance 类。清单 2 使用一个标准的 ExecutorService,将线程数量设置为可用的处理器数量。
清单 2. 在 Java 中通过多个线程来执行分块的距离计算
private final ExecutorService threadPool;private final String[] knownWords;private final int blockSize;public ThreadPoolDistance(String[] words, int block) { threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); knownWords = words; blockSize = block;}public DistancePair bestMatch(String target) { // build a list of tasks for matching to ranges of known words List<DistanceTask> tasks = new ArrayList<DistanceTask>(); int size = 0; for (int base = 0; base < knownWords.length; base += size) { size = Math.min(blockSize, knownWords.length - base); tasks.add(new DistanceTask(target, base, size)); } DistancePair best; try { // pass the list of tasks to the executor, getting back list of futures List<Future<DistancePair>> results = threadPool.invokeAll(tasks); // find the best result, waiting for each future to complete best = DistancePair.WORST_CASE; for (Future<DistancePair> future: results) { DistancePair result = future.get(); best = DistancePair.best(best, result); } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return best;}/** * Shortest distance task implementation using Callable. */public class DistanceTask implements Callable<DistancePair>{ private final String targetText; private final int startOffset; private final int compareCount; public DistanceTask(String target, int offset, int count) { targetText = target; startOffset = offset; compareCount = count; } private int editDistance(String word, int[] v0, int[] v1) { ... } /* (non-Javadoc) * @see java.util.concurrent.Callable#call() */ @Override public DistancePair call() throws Exception { // directly compare distances for comparison words in range int[] v0 = new int[targetText.length() + 1]; int[] v1 = new int[targetText.length() + 1]; int bestIndex = -1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(knownWords[i + startOffset], v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); }}
清单 2 中的 bestMatch() 方法构造一个 DistanceTask 距离列表,然后将该列表传递给 ExecutorService。这种对 ExecutorService 的调用形式将会接受一个 Collection<? extends Callable<T>> 类型的参数,该参数表示要执行的任务。该调用返回一个 Future<T> 列表,用它来表示执行的结果。ExecutorService 使用在每个任务上调用 call() 方法所返回的值,异步填写这些结果。在本例中,T 类型为 DistancePair— 一个表示距离和匹配的单词的简单的值对象,或者在没有找到惟一匹配值时近表示距离。
bestMatch() 方法中执行的原始线程依次等待每个 Future 完成,累积最佳的结果并在完成时返回它。通过多个线程来处理 DistanceTask 的执行,原始线程只需等待一小部分结果。剩余结果可与原始线程等待的结果并发地完成。
并发性性能
要充分利用系统上可用的处理器数量,必须为 ExecutorService 配置至少与处理器一样多的线程。您还必须将至少与处理器一样多的任务传递给 ExecutorService 来执行。实际上,您或许希望拥有比处理器多得多的任务,以实现最佳的性能。这样,处理器就会繁忙地处理一个接一个的任务,近在最后才空闲下来。但是因为涉及到开销(在创建任务和 future 的过程中,在任务之间切换线程的过程中,以及最终返回任务的结果时),您必须保持任务足够大,以便开销是按比例减小的。
图 1 展示了我在使用 Oracle 的 Java 7 for 64-bit Linux® 的四核 AMD 系统上运行测试代码时测量的不同任务数量的性能。每个输入单词依次与 12,564 个已知单词相比较,每个任务在一定范围的已知单词中找到最佳的匹配值。全部 933 个拼写错误的输入单词会重复运行,每轮运行之间会暂停片刻供 JVM 处理,该图中使用了 10 轮运行后的最佳时间。从图 1 中可以看出,每秒的输入单词性能在合理的块大小范围内(基本来讲,从 256 到大于 1,024)看起来是合理的,只有在任务变得非常小或非常大时,性能才会极速下降。对于块大小 16,384,最后的值近创建了一个任务,所以显示了单线程性能。
图 1. ThreadPoolDistance 性能
Fork-Join
Java 7 引入了 ExecutorService 的另一种实现:ForkJoinPool 类。ForkJoinPool 是为高效处理可反复分解为子任务的任务而设计的,它使用 RecursiveAction 类(在任务未生成结果时)或 RecursiveTask<T> 类(在任务具有一个 T 类型的结果时)来处理任务。RecursiveTask<T> 提供了一种合并子任务结果的便捷方式,如清单 3 所示。
清单 3. RecursiveTask<DistancePair> 示例
private ForkJoinPool threadPool = new ForkJoinPool();private final String[] knownWords;private final int blockSize;public ForkJoinDistance(String[] words, int block) { knownWords = words; blockSize = block;}public DistancePair bestMatch(String target) { return threadPool.invoke(new DistanceTask(target, 0, knownWords.length, knownWords));}/** * Shortest distance task implementation using RecursiveTask. */public class DistanceTask extends RecursiveTask<DistancePair>{ private final String compareText; private final int startOffset; private final int compareCount; private final String[] matchWords; public DistanceTask(String from, int offset, int count, String[] words) { compareText = from; startOffset = offset; compareCount = count; matchWords = words; } private int editDistance(int index, int[] v0, int[] v1) { ... } /* (non-Javadoc) * @see java.util.concurrent.RecursiveTask#compute() */ @Override protected DistancePair compute() { if (compareCount > blockSize) { // split range in half and find best result from bests in each half of range int half = compareCount / 2; DistanceTask t1 = new DistanceTask(compareText, startOffset, half, matchWords); t1.fork(); DistanceTask t2 = new DistanceTask(compareText, startOffset + half, compareCount - half, matchWords); DistancePair p2 = t2.compute(); return DistancePair.best(p2, t1.join()); } // directly compare distances for comparison words in range int[] v0 = new int[compareText.length() + 1]; int[] v1 = new int[compareText.length() + 1]; int bestIndex = -1; int bestDistance = Integer.MAX_VALUE; boolean single = false; for (int i = 0; i < compareCount; i++) { int distance = editDistance(i + startOffset, v0, v1); if (bestDistance > distance) { bestDistance = distance; bestIndex = i + startOffset; single = true; } else if (bestDistance == distance) { single = false; } } return single ? new DistancePair(bestDistance, knownWords[bestIndex]) : new DistancePair(bestDistance); }}
图 2 显示了清单 3 中的 ForkJoin 代码与 清单 2 中的 ThreadPool 代码的性能对比。ForkJoin 代码在所有块大小中稳定得多,仅在您只有单个块(意味着执行是单线程的)时性能会显著下降。标准的 ThreadPool 代码仅在块大小为 256 和 1,024 时会表现出更好的性能。
图 2. ThreadPoolDistance 与 ForkJoinDistance 的性能对比
这些结果表明,如果可调节应用程序中的任务大小来实现最佳的性能,那么使用标准 ThreadPool 比 ForkJoin 更好。但请注意,ThreadPool 的 “最佳性能点” 取决于具体任务、可用处理器数量以及您系统的其他因素。一般而言,ForkJoin 以最小的调优需求带来了优秀的性能,所以最好尽可能地使用它。