Fork/Join框架(二)创建一个Fork/Join池

创建一个Fork/Join池

在这个指南中,你将学习如何使用Fork/Join框架的基本元素。它包括:

  • 创建一个ForkJoinPool对象来执行任务。
  • 创建一个ForkJoinPool执行的ForkJoinTask类。

你将在这个示例中使用Fork/Join框架的主要特点,如下:

  • 你将使用默认构造器创建ForkJoinPool。
  • 在这个任务中,你将使用Java API文档推荐的结构:
1 If (problem size < default size){
2 tasks=divide(task);
3 execute(tasks);
4 } else {
5 resolve problem using another algorithm;
6 }
  • 你将以一种同步方式执行任务。当一个任务执行2个或2个以上的子任务时,它将等待它们的结束。通过这种方式 ,正在执行这些任务的线程(工作线程)将会查找其他任务(尚未执行的任务)来执行,充分利用它们的执行时间。
  • 你将要实现的任务将不会返回任何结果,所以你将使用RecursiveAction作为它们实现的基类。

准备工作

这个指南中的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。

如何做…

在这个指南中,你将继续实现一个任务来修改产品列表的价格。任务最初是负责更新一个队列中的所有元素。你将会使用10作为参考大小,如果一个任务必须更新超过10个元素,这些元素将被划分成两个部分,并创建两个任务来更新每个部分中的产品的价格。

按以下步骤来实现这个示例:

1.创建类Product,将用来存储产品的名称和价格。

1 public class Product {

2.声明一个私有的String类型的属性name和一个私有的double类型的属性price。

1 private String name;
2 private double price;

3.实现这些方法,用来设置和获取这两个属性的值。

01 public String getName() {
02 return name;
03 }
04 public void setName(String name) {
05 this.name = name;
06 }
07 public double getPrice() {
08 return price;
09 }
10 public void setPrice(double price) {
11  
12 this.price = price;
13 }

4.创建ProductListGenerator类,用来产生随机产品的数列。

1 public class ProductListGenerator {

5.实现generate()方法。它接收一个数列大小 的int类型参数,返回一个产生产品数列的List<Product>对象。

1 public List<Product> generate (int size) {

6.创建返回产品数列的对象。

1 List<Product> ret=new ArrayList<Product>();

7.创建产品队列。给所有产品赋予相同值。比如,10用来检查程序是否工作得很好。

1 for (int i=0; i<size; i++){
2 Product product=new Product();
3 product.setName("Product"+i);
4 product.setPrice(10);
5 ret.add(product);
6 }
7 return ret;
8 }

8.创建Task类,指定它继承RecursiveAction类。

1 public class Task extends RecursiveAction {

9.声明类的序列版本UID。这个元素是必需的,因为RecursiveAction类的父类ForkJoinTask实现了Serializable接口。

1 private static final long serialVersionUID = 1L;

10.声明一个私有的、List<Product>类型的属性products。

1 private List&lt;Product&gt; products;

11.声明两个私有的、int类型的属性first和last。这些属性将决定这个任务产品的阻塞过程。

1 private int first;
2 private int last;

12.声明一个私有的、double类型的属性increment,用来存储产品价格的增长。

1 private double increment;

13.实现这个类的构造器,初始化所有属性。

1 public Task (List&lt;Product&gt; products, int first, int last, double increment) {
2 this.products=products;
3 this.first=first;
4 this.last=last;
5 this.increment=increment;
6 }

14.实现compute()方法 ,该方法将实现任务的逻辑。

1 @Override
2 protected void compute() {

15.如果last和first的差小于10(任务只能更新价格小于10的产品),使用updatePrices()方法递增的设置产品的价格。

1 if (last-first<10) {
2 updatePrices();

16.如果last和first的差大于或等于10,则创建两个新的Task对象,一个处理产品的前半部分,另一个处理产品的后半部分,然后在ForkJoinPool中,使用invokeAll()方法执行它们。

1 } else {
2 int middle=(last+first)/2;
3 System.out.printf("Task: Pending tasks:
4 %s\n",getQueuedTaskCount());
5 Task t1=new Task(products, first,middle+1, increment);
6 Task t2=new Task(products, middle+1,last, increment);
7 invokeAll(t1, t2);
8 }

17.实现updatePrices()方法。这个方法更新产品队列中位于first值和last值之间的产品。

1 private void updatePrices() {
2 for (int i=first; i<last; i++){
3 Product product=products.get(i);
4 product.setPrice(product.getPrice()*(1+increment));
5 }
6 }

18.实现这个示例的主类,通过创建Main类,并实现main()方法。

1 public class Main {
2 public static void main(String[] args) {

19.使用ProductListGenerator类创建一个包括10000个产品的数列。

1 ProductListGenerator generator=new ProductListGenerator();
2 List<Product> products=generator.generate(10000);

20.创建一个新的Task对象,用来更新产品队列中的产品。first参数使用值0,last参数使用值10000(产品数列的大小)。

1 Task task=new Task(products, 0, products.size(), 0.20);

21.使用无参构造器创建ForkJoinPool对象。

1 ForkJoinPool pool=new ForkJoinPool();

22.在池中使用execute()方法执行这个任务 。

1 pool.execute(task);

23.实现一个显示关于每隔5毫秒池中的变化信息的代码块。将池中的一些参数值写入到控制台,直到任务完成它的执行。

01 do {
02 System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
03 System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount());
04 System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
05 try {
06 TimeUnit.MILLISECONDS.sleep(5);
07 } catch (InterruptedException e) {
08 e.printStackTrace();
09 }
10 } while (!task.isDone());

24.使用shutdown()方法关闭这个池。

1 pool.shutdown();

25.使用isCompletedNormally()方法检查假设任务完成时没有出错,在这种情况下,写入一条信息到控制台。

1 if (task.isCompletedNormally()){
2 System.out.printf("Main: The process has completed
3 normally.\n");
4 }

26.在增长之后,所有产品的价格应该是12。将价格不是12的所有产品的名称和价格写入到控制台,用来检查它们错误地增长它们的价格。

1 for (int i=0; i<products.size(); i++){
2 Product product=products.get(i);
3 if (product.getPrice()!=12) {
4 System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
5 }
6 }

27.写入一条信息到控制台表明程序的结束。

1 System.out.println("Main: End of the program.\n");

它是如何工作的…

在这个示例中,你已经创建一个ForkJoinPool对象和一个在池中执行的ForkJoinTask类的子类。为了创建ForkJoinPool对象,你已经使用了无参构造器,所以它会以默认的配置来执行。它创建一个线程数等于计算机处理器数的池。当ForkJoinPool对象被创建时,这些线程被创建并且在池中等待,直到有任务到达让它们执行。

由于Task类没有返回结果,所以它继承RecursiveAction类。在这个指南中,你已经使用了推荐的结构来实现任务。如果这个任务更新超过10产品,它将被分解成两部分,并创建两个任务,一个任务执行一部分。你已经在Task类中使用first和last属性,用来了解这个任务要更新的产品队列的位置范围。你已经使用first和last属性,只复制产品数列一次,而不是为每个任务创建不同的数列。

它调用invokeAll()方法,执行每个任务所创建的子任务。这是一个同步调用,这个任务在继续(可能完成)它的执行之前,必须等待子任务的结束。当任务正在等待它的子任务(结束)时,正在执行它的工作线程执行其他正在等待的任务。在这种行为下,Fork/Join框架比Runnable和Callable对象本身提供一种更高效的任务管理。

ForkJoinTask类的invokeAll()方法是执行者(Executor)和Fork/Join框架的一个主要区别。在执行者框架中,所有任务被提交给执行者,而在这种情况下,这些任务包括执行和控制这些任务的方法都在池内。你已经在Task类中使用invokeAll()方法,它是继承了继承ForkJoinTask类的RecursiveAction类。

你使用execute()方法提交唯一的任务给这个池,用来所有产品数列。在这种情况下,它是一个异步调用,而主线程继续它的执行。

你已经使用ForkJoinPool类的一些方法,用来检查正在运行任务的状态和变化。基于这个目的,这个类包括更多的方法。参见有这些方法完整列表的监控一个Fork/Join池指南。

最后,与执行者框架一样,你应该使用shutdown()方法结束ForkJoinPool。
 以下截图显示这个示例执行的一部分:

你可以看出任务正在完成它们的工作和产品价格的更新。

不止这些…

ForkJoinPool类提供其他的方法,用来执行一个任务。这些方法如下:

  • execute (Runnable task):这是在这个示例中,使用的execute()方法的另一个版本。在这种情况下,你可以提交一个Runnable对象给ForkJoinPool类。注意:ForkJoinPool类不会对Runnable对象使用work-stealing算法。它(work-stealing算法)只用于ForkJoinTask对象。
  • invoke(ForkJoinTask<T> task):当execute()方法使用一个异步调用ForkJoinPool类,正如你在本示例中所学的,invoke()方法使用同步调用ForkJoinPool类。这个调用不会(立即)返回,直到传递的参数任务完成它的执行。
  • 你也可以使用在ExecutorService接口的invokeAll()和invokeAny()方法。这些方法接收一个Callable对象作为参数。ForkJoinPool类不会对Callable对象使用work-stealing算法,所以你最好使用执行者去执行它们。

ForkJoinTask类同样提供在示例中使用的invokeAll()的其他版本。这些版本如下:

  • invokeAll(ForkJoinTask<?>… tasks):这个版本的方法使用一个可变参数列表。你可以传入许多你想要执行的ForkJoinTask对象作为参数。
  • invokeAll(Collection<T> tasks):这个版本的方法接收一个泛型类型T对象的集合(如:一个ArrayList对象,一个LinkedList对象或者一个TreeSet对象)。这个泛型类型T必须是ForkJoinTask类或它的子类。

即使ForkJoinPool类被设计成用来执行一个ForkJoinTask,你也可以直接执行Runnable和Callable对象。你也可以使用ForkJoinTask类的adapt()方法来执行任务,它接收一个Callable对象或Runnable对象(作为参数)并返回一个ForkJoinTask对象。 参见

  • 在第8章,测试并发应用程序中的监控一个Fork/Join池的指南
时间: 2024-12-07 21:20:05

Fork/Join框架(二)创建一个Fork/Join池的相关文章

ASP.NET 2.0数据教程之六十二:创建一个用户自定义的Database-Driven Site Map

返回"ASP.NET 2.0数据教程目录" ASP.NET 2.0数据教程之六十二:创建一个用户自定义的Database-Driven Site Map Provider 导言: ASP.NET 2.0的网站地图(site map)功能允许页面开发者在一些 持久介质(persistent medium),比如一个XML文件里,自己定义一个web程序的 site map.一旦定义了之后,我们可以通过System.Web命名空间的SiteMap class 类或某个Web导航控件,比如Si

maven(二)创建一个maven的web项目中解决Cannot change version of project facet Dynamic web module to 2.5

我们用Eclipse创建Maven结构的web项目的时候选择了Artifact Id为maven-artchetype-webapp,由于这个catalog比较老,用的servlet还是2.3的,而一般现在至少都是2.5,在 Project Facets里面修改Dynamic web module为2.5的时候就会出现Cannot change version of project facet Dynamic web module to 2.5,如图: 其实在右边可以看到改到2.5需要的条件以及

Fork/Join框架(三)加入任务的结果

加入任务的结果 Fork/Join框架提供了执行返回一个结果的任务的能力.这些任务的类型是实现了RecursiveTask类.这个类继承了ForkJoinTask类和实现了执行者框架提供的Future接口. 在任务中,你必须使用Java API方法推荐的结构: 1 If (problem size < size){ 2 tasks=Divide(task); 3 execute(tasks); 4 groupResults() 5 return result; 6 } else { 7 reso

Java Fork/Join框架_java

Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

Fork/Join框架(四)异步运行任务

异步运行任务 当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现.当你使用同步方式时,提交任务给池的方法直到提交的任务完成它的执行,才会返回结果.当你使用异步方式时,提交任务给执行者的方法将立即返回,所以这个任务可以继续执行. 你应该意识到这两个方法有很大的区别,当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行.这允许ForkJoinPool类使用work-stealing算法,分配一个新

Fork/Join框架(一)引言

在这个章节中,我们将覆盖: 创建一个Fork/Join池 加入任务的结果 异步运行任务 任务抛出异常 取消任务 引言 通常,当你实现一个简单的并发应用程序,你实现一些Runnable对象和相应的 Thread对象.在你的程序中,你控制这些线程的创建.执行和状态.Java 5引入了Executor和ExecutorService接口及其实现类进行了改进(比如:ThreadPoolExecutor类). 执行者框架将任务的创建与执行分离.有了它,你只要实现Runnable对象和使用Executor对

Fork/Join框架(五)在任务中抛出异常

声明:本文是< Java 7 Concurrency Cookbook>的第五章,作者: Javier Fernández González 译者:许巧辉 校对:方腾飞 在任务中抛出异常 在Java中有两种异常: 已检查异常(Checked exceptions):这些异常必须在一个方法的throws从句中指定或在内部捕捉它们.比如:IOException或ClassNotFoundException. 未检查异常(Unchecked exceptions):这些异常不必指定或捕捉.比如:Nu

Fork/Join框架(六)取消任务

取消任务 当你在一个ForkJoinPool类中执行ForkJoinTask对象,在它们开始执行之前,你可以取消执行它们.ForkJoinTask类提供cancel()方法用于这个目的.当你想要取消一个任务时,有一些点你必须考虑一下,这些点如下: ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务. 当你取消一个任务时,你不能取消一个已经执行的任务. 在这个指南中,你将实现取消ForkJoinTask对象的例子.你将查找数在数组中的位置.第一个找到这个数的任务将取消剩

线程执行者(二)创建一个线程执行者

创建一个线程执行者 使用Executor framework的第一步就是创建一个ThreadPoolExecutor类的对象.你可以使用这个类提供的4个构造器或Executors工厂类来 创建ThreadPoolExecutor.一旦有执行者,你就可以提交Runnable或Callable对象给执行者来执行. 在这个指南中,你将会学习如何使用这两种操作来实现一个web服务器的示例,这个web服务器用来处理各种客户端请求. 准备工作 你应该事先阅读第1章中创建和运行线程的指南,了解Java中线程创