线程执行者(十一)执行者分离任务的启动和结果的处理

执行者分离任务的启动和结果的处理

通常,当你使用执行者执行并发任务时,你将会提交 Runnable或Callable任务给这个执行者,并获取Future对象控制这个方法。你可以发现这种情况,你需要提交任务给执行者在一个对象中,而处理结果在另一个对象中。基于这种情况,Java并发API提供CompletionService类。

CompletionService 类有一个方法来提交任务给执行者和另一个方法来获取已完成执行的下个任务的Future对象。在内部实现中,它使用Executor对象执行任务。这种行为的优点是共享一个CompletionService对象,并提交任务给执行者,这样其他(对象)可以处理结果。其局限性是,第二个对象只能获取那些已经完成它们的执行的任务的Future对象,所以,这些Future对象只能获取任务的结果。

在这个指南中,你将学习如何使用CompletionService类把执行者启动任务和处理它们的结果分开。

准备工作…

这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

如何做…

按以下步骤来实现的这个例子:

1.创建ReportGenerator类,并指定其实现Callable接口,参数化为String类型。

1 public class ReportGenerator implements Callable<String> {

2.声明两个私有的、String类型的属性,sender和title,用来表示报告的数据。

1 private String sender;
2 private String title;

3.实现这个类的构造器,初始化这两个属性。

1 public ReportGenerator(String sender, String title){
2 this.sender=sender;
3 this.title=title;
4 }

4.实现call()方法。首先,让线程睡眠一段随机时间。

1 @Override
2 public String call() throws Exception {
3 try {
4 Long duration=(long)(Math.random()*10);
5 System.out.printf("%s_%s: ReportGenerator: Generating a report during %d seconds\n",this.sender,this.title,duration);
6 TimeUnit.SECONDS.sleep(duration);
7 } catch (InterruptedException e) {
8 e.printStackTrace();
9 }

5.然后,生成一个有sender和title属性的字符串的报告,返回这个字符串。

1 String ret=sender+": "+title;
2 return ret;
3 }

6.创建ReportRequest类,实现Runnable接口。这个类将模拟一些报告请求。

1 public class ReportRequest implements Runnable {

7.声明私有的、String类型的属性name,用来存储ReportRequest的名称。

1 private String name;

8.声明私有的、CompletionService类型的属性service。CompletionService接口是个参数化接口,使用String类型参数化它。

1 private CompletionService<String> service;

9.实现这个类的构造器,初始化这两个属性。

1 public ReportRequest(String name, CompletionService<String> service){
2 this.name=name;
3 this.service=service;
4 }

10.实现run()方法。创建1个ReportGenerator对象,并使用submit()方法把它提交给CompletionService对象。

1 @Override
2 public void run() {
3 ReportGenerator reportGenerator=new ReportGenerator(name,"Report");
4 service.submit(reportGenerator);
5 }

11.创建ReportProcessor类。这个类将获取ReportGenerator任务的结果,指定它实现Runnable接口。

1 public class ReportProcessor implements Runnable {

12.声明一个私有的、CompletionService类型的属性service。由于CompletionService接口是个参数化接口,使用String类作为这个CompletionService接口的参数。

1 private CompletionService<String> service;

13.声明一个私有的、boolean类型的属性end。

1 private boolean end;

14.实现这个类的构造器,初始化这两个属性。

1 public ReportProcessor (CompletionService<String> service){
2 this.service=service;
3 end=false;
4 }

15.实现run()方法。当属性end值为false,调用CompletionService接口的poll()方法,获取CompletionService执行的下个已完成任务的Future对象。

1 @Override
2 public void run() {
3 while (!end){
4 try {
5 Future<String> result=service.poll(20, TimeUnit.SECONDS);

16.然后,使用Future对象的get()方法获取任务的结果,并且将这些结果写入到控制台。

01 if (result!=null) {
02 String report=result.get();
03 System.out.printf("ReportReceiver: Report Received:%s\n",report);
04 }
05 } catch (InterruptedException | ExecutionException e) {
06 e.printStackTrace();
07 }
08 }
09 System.out.printf("ReportSender: End\n");
10 }

17.实现setEnd()方法,用来修改属性end的值。

1 public void setEnd(boolean end) {
2 this.end = end;
3 }

18.实现这个示例的主类,通过创建Main类,并实现main()方法。

1 public class Main {
2 public static void main(String[] args) {

19.使用Executors类的newCachedThreadPool()方法创建ThreadPoolExecutor。

1 ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();

20.创建CompletionService,使用前面创建的执行者作为构造器的参数。

1 CompletionService<String> service=new ExecutorCompletionService<>(executor);

21.创建两个ReportRequest对象,并用线程执行它们。

1 ReportRequest faceRequest=new ReportRequest("Face", service);
2 ReportRequest onlineRequest=new ReportRequest("Online";,service);
3 Thread faceThread=new Thread(faceRequest);
4 Thread onlineThread=new Thread(onlineRequest);

22.创建一个ReportProcessor对象,并用线程执行它。

1 ReportProcessor processor=new ReportProcessor(service);
2 Thread senderThread=new Thread(processor);

23.启动这3个线程。

1 System.out.printf("Main: Starting the Threads\n");
2 faceThread.start();
3 onlineThread.start();
4 senderThread.start();

24.等待ReportRequest线程的结束。

1 try {
2 System.out.printf("Main: Waiting for the report
3 generators.\n");
4 faceThread.join();
5 onlineThread.join();
6 } catch (InterruptedException e) {
7 e.printStackTrace();
8 }

25.使用shutdown()方法关闭执行者,使用awaitTermination()方法等待任务的结果。

1 System.out.printf("Main: Shutting down the executor.\n");
2 executor.shutdown();
3 try {
4 executor.awaitTermination(1, TimeUnit.DAYS);
5 } catch (InterruptedException e) {
6 e.printStackTrace();
7 }

26.设置ReportSender对象的end属性值为true,结束它的执行。

1 processor.setEnd(true);
2 System.out.println("Main: Ends");

这是如何工作的…

在示例的主类中,你使用Executors类的newCachedThreadPool()方法创建ThreadPoolExecutor。然后,使用这个对象初始化一个CompletionService对象,因为CompletionService需要使用一个执行者来执行任务。利用CompletionService执行一个任务,你需要使用submit()方法,如在ReportRequest类中。

当其中一个任务被执行,CompletionService完成这个任务的执行时,这个CompletionService在一个队列中存储Future对象来控制它的执行。poll()方法用来查看这个列队,如果有任何任务执行完成,那么返回列队的第一个元素,它是一个已完成任务的Future对象。当poll()方法返回一个Future对象时,它将这个Future对象从队列中删除。这种情况下,你可以传两个属性给那个方法,表明你想要等任务结果的时间,以防队列中的已完成任务的结果是空的。

一旦CompletionService对象被创建,你创建2个ReportRequest对象,用来执行3个ReportGenerator任务,每个都在CompletionService中,和一个ReportSender任务,它将会处理已提交给2个ReportRequest对象的任务所产生的结果。

不止这些…

CompletionService类可以执行Callable和Runnable任务。在这个示例中,你已经使用Callable,但你同样可以提交Runnable对象。由于Runnable对象不会产生结果,CompletionService类的理念不适用于这些情况。

这个类同样提供其他两个方法,用来获取已完成任务的Future对象。这两个方法如下:

  • poll():不带参数版本的poll()方法,检查是否有任何Future对象在队列中。如果列队是空的,它立即返回null。否则,它返回第一个元素,并从列队中删除它。
  • take():这个方法,不带参数。检查是否有任何Future对象在队列中。如果队列是空的,它阻塞线程直到队列有一个元素。当队列有元素,它返回第一元素,并从列队中删除它。

参见

  • 在第4章,线程执行者中的执行者执行返回结果的任务指南
时间: 2024-10-28 19:27:53

线程执行者(十一)执行者分离任务的启动和结果的处理的相关文章

第四章 线程执行者(一)引言

在这个章节中,我们将覆盖: 创建一个线程执行者 创建一个大小固定的线程执行者 执行者运行带有返回结果的任务 运行多个任务并处理第一个结果 运行多个任务并处理所有结果 执行者延迟运行一个任务 执行者周期性地运行一个任务 执行者取消一个任务 执行者控制一个任务完成 执行者分离任务的启动和结果的处理 执行者控制被拒绝的任务 引言 通常,当你在Java中开发一个简单的并发编程应用程序,你会创建一些Runnable对象并创建相应的Thread对象来运行它们.如果你开发一个运行多个并发任务的程序,这种途径的

线程执行者(九)执行者取消一个任务

执行者取消一个任务 当你使用执行者工作时,你不得不管理线程.你只实现Runnable或 Callable任务和把它们提交给执行者.执行者负责创建线程,在线程池中管理它们,当它们不需要时,结束它们.有时候,你想要取消已经提交给执行者 的任务.在这种情况下,你可以使用Future的cancel()方法,它允许你做取消操作.在这个指南中,你将学习如何使用这个方法来取消已经提交给执行者的任务. 准备工作- 这个指南的例子使用Eclipse IDE实现.如果你使用Eclipse或其他IDE,如NetBea

线程执行者(三)创建一个大小固定的线程执行者

创建一个大小固定的线程执行者 当你使用由Executors类的 newCachedThreadPool()方法创建的基本ThreadPoolExecutor,你会有执行者运行在某一时刻的线程数的问题.这个执行者为每个接收到的任务创建一个线程(如果池中没有空闲的线程),所以,如果你提交大量的任务,并且它们有很长的(执行)时间,你会使系统过载和引发应用程序性能不佳的问题. 如果你想要避免这个问题,Executors类提供一个方法来创建大小固定的线程执行者.这个执行者有最大线程数. 如果你提交超过这个

线程执行者(二)创建一个线程执行者

创建一个线程执行者 使用Executor framework的第一步就是创建一个ThreadPoolExecutor类的对象.你可以使用这个类提供的4个构造器或Executors工厂类来 创建ThreadPoolExecutor.一旦有执行者,你就可以提交Runnable或Callable对象给执行者来执行. 在这个指南中,你将会学习如何使用这两种操作来实现一个web服务器的示例,这个web服务器用来处理各种客户端请求. 准备工作 你应该事先阅读第1章中创建和运行线程的指南,了解Java中线程创

线程执行者(六)运行多个任务并处理所有结果

运行多个任务并处理所有结果 执行者框架允许你在不用担心线程创建和执行的情况下,并发的执行任务.它还提供了Future类,这个类可以用来控制任务的状态,也可以用来获得执行者执行任务的结果. 如果你想要等待一个任务完成,你可以使用以下两种方法: 如果任务执行完成,Future接口的isDone()方法将返回true. ThreadPoolExecutor类的awaitTermination()方法使线程进入睡眠,直到每一个任务调用shutdown()方法之后完成执行. 这两种方法都有一些缺点.第一个

线程是否启动的问题

问题描述 classMyThreadimplementsRunnable{//实现Runnable接口publicvoidrun(){//覆写run()方法for(inti=0;i<3;i++){System.out.println(Thread.currentThread().getName()+"运行,i="+i);//取得当前线程的名字}}};publicclassThreadAliveDemo{publicstaticvoidmain(Stringargs[]){MyTh

详解Java多线程编程中线程的启动、中断或终止操作_java

线程启动: 1.start() 和 run()的区别说明start() : 它的作用是启动一个新线程,新线程会执行相应的run()方法.start()不能被重复调用. run() : run()就和普通的成员方法一样,可以被重复调用.单独调用run()的话,会在当前线程中执行run(),而并不会启动新线程! 下面以代码来进行说明. class MyThread extends Thread{ public void run(){ ... } }; MyThread mythread = new

[转载]Linux 线程实现机制分析

  自从多线程编程的概念出现在 Linux 中以来,Linux 多线应用的发展总是与两个问题脱不开干系:兼容性.效率.本文从线程模型入手,通过分析目前 Linux 平台上最流行的 LinuxThreads 线程库的实现及其不足,描述了 Linux 社区是如何看待和解决兼容性和效率这两个问题的.   一.基础知识:线程和进程 按照教科书上的定义,进程是资源管理的最小单位,线程是程序执行的最小单位.在操作系统设计上,从进程演化出线程,最主要的目的就是更好的支持SMP以及减小(进程/线程)上下文切换开

Linux中线程使用详解

Linux下多线程详解pdf文档下载:点击这里! 线程与进程为什么有了进程的概念后,还要再引入线程呢?使用多线程到底有哪些好处?什么的系统应该选用多线程?我们首先必须回答这些问题. 使用多线程的理由之一是和进程相比,它是一种非常"节俭"的多任务操作方式.我们知道,在Linux系统下,启动一个新的进程必须分配给它独立的地址空间,建立众多的数据表来维护它的代码段.堆栈段和数据段,这是一种"昂贵"的多任务工作方式.而运行于一个进程中的多个线程,它们彼此之间使用相同的地址空