哪个线程执行 CompletableFuture’s tasks 和 callbacks?

CompletableFuture尽管在2014年的三月随着Java8被提出来,但它现在仍然是一种相对较新潮的概念。但也许这个类不为人所熟知是好事,因为它很容易被滥用,特别是涉及到使用线程和线程池的时候。而这篇文章的目的就是要描述线程是怎样使用CompletableFuture的。

Running tasks

这是API的基础部分,它有一个很实用的supplyAsync()方法,这个方法和ExecutorService.submit()很像,但不同的是返回CompletableFuture:

1 CompletableFuture.supplyAsync(() -> {
2             try (InputStream is = new URL("http://www.nurkiewicz.com").openStream()) {
3                 log.info("Downloading");
4                 return IOUtils.toString(is, StandardCharsets.UTF_8);
5             catch (IOException e) {
6                 throw new RuntimeException(e);
7             }
8         });

问题是supplyAsync()默认使用 ForkJoinPool.commonPool(),线程池由所有的CompletableFutures分享,所有的并行流和所有的应用都部署在同一个虚拟机上(如果你很不幸的仍在使用有很多人工部署的应用服务器)。这种硬编码的,不可配置的线程池完全超出了我们的控制,很难去监测和度量。因此你应该指定你自己的Executor,就像这里(也可以看看这里几种创造这样Exetutor的方法):

1 ExecutorService pool = Executors.newFixedThreadPool(10);
2  
3 final CompletableFuture future =
4         CompletableFuture.supplyAsync(() -> {
5             //...
6         }, pool);

这仅仅是开始…

Callbacks and transformations

假如你想转换给定的CompletableFuture,例如提取String的长度:

1 CompletableFuture intFuture =
2     future.thenApply(s -> s.length());

那么是谁调用了s.length()?坦白点,我一点也不在乎。只要涉及到lambda表达式,那么所有的执行者像thenApply这样的就是廉价的,我们并不关心是谁调用了lambda表达式。但如果这样的表达式会占用一点点的CPU来完成阻塞的网络通信那又会如何呢?

首先默认情况下会发生什么?试想一下:我们有一个返回String类型的后台任务,当结果完成时我们想要异步地去执行特定的变换。最容易的实现方法是通过包装一个原始的任务(返回String),任务完成时截获它。当内部的task结束后,回调就开始执行,执行变换和返回改进的值。就像有一个面介于我们的代码和初始的计算结果之间(个人看法:这里指的是下面的future里面包含的task执行完毕返回结果s,然后立马执行callback也就是thenApply里面的lambda表达式,这也就是为什么作者说有一个面位于初始计算结果和回调执行代码之间)。那就是说这应该相当明显了,s.length()的变换会在和执行原始任务相同的线程里完成,哈?并不完全是这样!(这里指的是有时候变换的线程和执行原始任务的线程不是同一个线程,看下面就知道)

01 CompletableFuture future =
02         CompletableFuture.supplyAsync(() -> {
03             sleepSeconds(2);
04             return "ABC";
05         }, pool);
06  
07 future.thenApply(s -> {
08     log.info("First transformation");
09     return s.length();
10 });
11  
12 future.get();
13 pool.shutdownNow();
14 pool.awaitTermination(1, TimeUnit.MINUTES);
15  
16 future.thenApply(s -> {
17     log.info("Second transformation");
18     return s.length();
19 });

如果future里面的task还在运行,那么包含first transformation的 thenApply()就会一直处于挂起状态。而这个task完成后thenApply()会立即执行,执行的线程和执行task的线程是同一个。然而在注册第二次变换之前(也就是执行第二个thenApply()),我们将一直等待直到task完成(和第一个变换是一样的,都需要等待)。更坏的情况是,我们完全地关闭了线程池,保证其他的代码将不会执行。那么哪个线程将要执行二次变换呢?我们都知道当注册了callback的future完成时,二次变换必定会立刻执行。这就是说它是使用默认的主线程(来完成callback),上面的代码输出如下:

pool-1-thread-1 | First transformation      main | Second transformation

二次变换在注册的时候就意识到CompletableFuture已经完成了(指的是future里面的task已经返回结果,其实在第一次调用thenApply()之前就已经返回了,所以这一次不用等待task),因此它立刻执行了变换。由于此时已经没有其他的线程,所以thenApply()就只能在当前的main线程环境中被调用。最主要的原因还是因为这种行为机制在实际的变换成本很高时(如很耗时)很容易出错。想象一下thenApply()内部的lambda表达式在进行一些繁重的计算或者阻塞的网络调用,突然我们的异步 CompletableFuture阻塞了调用者线程!

Controlling callback’s thread pool

有两种技术去控制执行回调和变换的线程,需要注意的是这些方法仅仅适用你的变换需要很高成本的时候,其他情况下可以忽略。那么第一个方法可以选择使用操作者的 *Async方法,例如:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 });

这一次second transformation被自动地卸载到了我们的老朋友线程ForkJoinPool.commonPool()中去了:

1 pool-1-thread-1                  | First transformation
2 ForkJoinPool.commonPool-worker-1 | Second transformation

但我们并不喜欢commonPool,所以我们提供自己的:

1 future.thenApplyAsync(s -> {
2     log.info("Second transformation");
3     return s.length();
4 }, pool2);

注意到这里使用的是不同的线程池(pool-1 vs. pool-2):

1 pool-1-thread-1 | First transformation
2 pool-2-thread-1 | Second transformation

Treating callback like another computation step

我相信如果你在处理一些长时间运行的callbacks和transformations上有些麻烦(记住这篇文章同样也适用于CompletableFuture的其他大部分方法),你应该简单地使用其他表意明确的CompletableFuture,就像这样:

01 //Imagine this is slow and costly
02 CompletableFuture<Integer> strLen(String s) {
03     return CompletableFuture.supplyAsync(
04             () -> s.length(),
05             pool2);
06 }
07  
08 //...
09  
10 CompletableFuture<Integer> intFuture =
11         future.thenCompose(s -> strLen(s));

这种方法更加明确,知道我们的变换有很大的开销,我们不会将它运行在一些随意的不可控的线程上。取而代之的是我们会将String到CompletableFuture<Integer>的变换封装为一个异步操作。然而,我们必须用thenCompose()取代thenApply(),否则的话我们会得到CompletableFuture<CompletableFuture<Integer>>.

但如果我们的transformation 没有一个能够很好地处理嵌套CompletableFuture的形式怎么办,如applyToEither()会等待第一个Future完成然后执行transformation.

1 CompletableFuture<CompletableFuture<Integer>> poor =
2         future1.applyToEither(future2, s -> strLen(s));

这里有个很实用的技巧,用来“展开”这类难以理解的数据结构,这种技巧叫flatten,通过使用flatMap(identity) (or flatMap(x -> x))。在我们的例子中flatMap()就叫做thenCompose:

1 CompletableFuture<Integer> good =
2         poor.thenCompose(x -> x);

我把它留给你,去弄懂它是怎样和为什么这样工作的。我想这篇文章已经尽量清楚地阐述了线程是如何参与到CompletableFuture中去的。

转载自 并发编程网 - ifeve.com

时间: 2024-09-26 00:45:07

哪个线程执行 CompletableFuture’s tasks 和 callbacks?的相关文章

java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)

多线程应用中,经常会遇到这种场景:后面的处理,依赖前面的N个线程的处理结果,必须等前面的线程执行完毕后,后面的代码才允许执行. 在我不知道CyclicBarrier之前,最容易想到的就是放置一个公用的static变量,假如有10个线程,每个线程处理完上去累加下结果,然后后面用一个死循环(或类似线程阻塞的方法),去数这个结果,达到10个,说明大家都爽完了,可以进行后续的事情了,这个想法虽然土鳖,但是基本上跟语言无关,几乎所有主流编程语言都支持. package yjmyzz.test; publi

CUDA线程执行模型分析(二)大军未动粮草先行---GPU的革命

序:今天或许是比较不顺心的一天,从早上第一个电话,到下午的一些列的事情,有些许的失落.有的时候真的很想把工作和生活完全分开,但是谁又能真正的分得那么开,人非草木!很多的时候都想给人生下一些定义,添加一些注释.但是生活本来就是不需要添加注释的自解释的代码.用0来解释?还是用1来解释?0,天地之始,1,万物之源.谁又能说清楚,是先有0,还是先有1,他们本就是同体--要想成事,就应该拿得起,放得下.感叹人生的同时,人生的旅程是不会停止的--手下还有招来的那么多将士,都还等着啦! 正文:书接上回--<C

在Android中使用Handler和Thread线程执行后台操作

大家都知道,在PC上的应用程序当需要进行一些复杂的数据操作,但不需要界面UI的时候 ,我们会为应用程序专门写一个线程去执行这些复杂的数据操作.通过线程,可以执行例如 :数据处理.数据下载等比较耗时的操作,同时对用户的界面不会产生影响.在Android应用 程序开发中,同样会遇到这样的问题.当我们需要访问网络,从网上下载数据并显示在我们 的UI上时,就会启动后台线程去下载数据,下载线程执行完成后将结果返回给主用户界面线 程. 对于线程的控制,我们将介绍一个Handler类,使用该类可以对运行在不同

iOS应用程序中通过dispatch队列控制线程执行的方法_IOS

GCD编程的核心就是dispatch队列,dispatch block的执行最终都会放进某个队列中去进行,它类似NSOperationQueue但更复杂也更强大,并且可以嵌套使用.所以说,结合block实现的GCD,把函数闭包(Closure)的特性发挥得淋漓尽致. dispatch队列的生成可以有这几种方式: 1. dispatch_queue_t queue = dispatch_queue_create("com.dispatch.serial", DISPATCH_QUEUE_

Java多线程--让主线程等待所有子线程执行完毕代码

采用CountDownLatch类来实现     主线程    package test; import java.util.concurrent.CountDownLatch; public class Main {  /**   *   * @author Administrator/2012-3-1/上午09:36:55   */  public static void main(String[] args) {   int threadNum = 10;   CountDownLatch

c# silverlight中线程执行结果问题

问题描述 在silverlight中,以下是代码缩写,望能表达我遇到的问题.我该怎样才能使得Ret被赋值后才返回?怎样才能使得线程执行完主程序才接着走下去呢?publicvoidA(){stringstr;str=B();MessgeBox.show(str);}publicstringB(){stringret;newThread(()=>{//这个函数必须在线程里ret=GetStr();}).Start();returnret;} 解决方案 解决方案二:求帮顶!求解答!谢谢!解决方案三:你

ython-blinker 接收到消息开的线程执行的函数中 创建了一个定时器线程程序就异常 了

问题描述 blinker 接收到消息开的线程执行的函数中 创建了一个定时器线程程序就异常 了 有没有誰碰到过这样的问题,指点一下,急. 我要实现的是从串口上报数据中发现一些主动上报,如收到短信之类的 然后发送消息调用相应的方法去读取串口数据,读取结束靠的是在指定时间内收到特定的字符串如'OK',所以在函数中又生成一个定时器线程来处理超时的情况.可是这样子有URC上报是消息调用的方法总是不能正常执行,还影响了主线程.

关于Java中停止线程执行的方法总结

Java中停止线程执行的方法 一.暂停或停止线程的理论 在Java编程中,要暂停或停止当前正在运行的线程,有几种方法.对于把线程转入睡眠Sleep状态,使用Thread.sleep()是最正确的方式.或许有人会问,为什么不使用等待wait()或通知notify()?要知道,使用等待或通知都不是很好的方式.线程可以使用等待wait()实现被阻塞,这属于条件等待的方式,当条件满足后,又会从阻塞转为等待状态.尽管可以在等待wait()条件那里放一个超时设置,但等待wait()的设计目的不是这样的,等待

Java并发编程示例(六):等待线程执行终止_java

在某些场景下,我们必须等待线程执行完成才能进行下一步工作.例如,某些程序在开始执行之前,需要先初始化一些资源.这时,我们可以启动一个线程专门来做初始化任务,等到线程任务完成后,再去执行其他部分. 为此,Thread类为我们提供了join()方法.当我们使用线程对象调用此方法时,正在掉调用的线程对象将被推迟到被调用对象执行完成后再开始执行. 在本节,示例程序演示等待初始化方法完成后,再去执行其他任务. 知其然 按照下面所示步骤,完成示例程序. 1.创建一个名为DataSourcesLoader的类