​Java并发新构件之Exchanger

    Exchanger是在两个任务之间交换对象的栅栏。当两个任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有对方的对象。Exchanger的典型应用场景是:一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。

    为了演示Exchanger类,我们将创建生产者和消费者任务。ExchangerProducer和ExchangerConsumer使用一个List<Fat>作为要求交换的对象,它们都包含一个用于这个List<Fat>的Exchanger。当你调用Exchanger.exchange()方法时,它将阻塞直至对方任务调用它自己的exchange()方法,那时,这两个exchange()方法将同时完成,而List<Fat>被交换:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

import java.util.List;

import java.util.concurrent.CopyOnWriteArrayList;

import java.util.concurrent.Exchanger;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

 

class ExchangerProducer implements Runnable {

    private List<Fat> holder;

    private Exchanger<List<Fat>> exchanger;

    public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {

        this.exchanger = exchanger;

        this.holder = holder;

    }

    @Override

    public void run() {

        try {

            while(!Thread.interrupted()) {

                //填充列表

                for (int i = 0;i < ExchangerDemo.size; i++) {

                    holder.add(new Fat());

                }

                //等待交换

                holder = exchanger.exchange(holder);

            }

        catch (InterruptedException e) {

        }

        System.out.println("Producer stopped.");

    }

}

 

class ExchangerConsumer implements Runnable {

    private List<Fat> holder;

    private Exchanger<List<Fat>> exchanger;

    private volatile Fat value;

    private static int num = 0;

    public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {

        this.exchanger = exchanger;

        this.holder = holder;

    }

    @Override

    public void run() {

        try {

            while(!Thread.interrupted()) {

                //等待交换

                holder = exchanger.exchange(holder);

                //读取列表并移除元素

                for (Fat x : holder) {

                    num++;

                    value = x;

                    //在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的

                    holder.remove(x);

                }

                if (num % 10000 == 0) {

                    System.out.println("Exchanged count=" + num);

                }

            }

        catch (InterruptedException e) {

             

        }

        System.out.println("Consumer stopped. Final value: " + value);

    }

}

 

public class ExchangerDemo {

    static int size = 10;

    static int delay = 5//秒

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        List<Fat> producerList = new CopyOnWriteArrayList<>();

        List<Fat> consumerList = new CopyOnWriteArrayList<>();

        Exchanger<List<Fat>> exchanger = new Exchanger<>();

        exec.execute(new ExchangerProducer(exchanger, producerList));

        exec.execute(new ExchangerConsumer(exchanger, consumerList));

        TimeUnit.SECONDS.sleep(delay);

        exec.shutdownNow();

    }

}

 

class Fat {

    private volatile double d;

    private static int counter = 1;

    private final int id = counter++;

    public Fat() {

        //执行一段耗时的操作

        for (int i = 1; i<10000; i++) {

            d += (Math.PI + Math.E) / (double)i;

        }

    }

    public void print() {System.out.println(this);}

    public String toString() {return "Fat id=" + id;}

}

执行结果(可能的结果):

?


1

2

3

4

5

6

7

8

9

10

Exchanged count=10000

Exchanged count=20000

Exchanged count=30000

Exchanged count=40000

Exchanged count=50000

Exchanged count=60000

Exchanged count=70000

Exchanged count=80000

Consumer stopped. Final value: Fat id=88300

Producer stopped.

    在main()中,创建了用于两个任务的单一的Exchanger,以及两个用于互换的CopyOnWriteArrayList。这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常。ExchangerProducer将填充这个List,然后将这个满列表跟ExchangerConsumer的空列表交换。交换之后,ExchangerProducer可以继续的生产Fat对象,而ExchangerConsumer则开始使用满列表中的对象。因为有了Exchanger,填充一个列表和消费另一个列表便同时发生了。

时间: 2024-12-25 17:26:35

​Java并发新构件之Exchanger的相关文章

Java并发新构件之PriorityBlockingQueue

An unbounded blocking queue that uses the same ordering rules as class PriorityQueue and supplies blocking retrieval operations. While this queue is logically unbounded, attempted additions may fail due to resource exhaustion (causing OutOfMemoryErro

Java并发新构件之DelayQueue

    DelayQueue主要用于放置实现了Delay接口的对象,其中的对象只能在其时刻到期时才能从队列中取走.这种队列是有序的,即队头的延迟到期时间最短.如果没有任何延迟到期,那么久不会有任何头元素,并且poll()将返回null(正因为这样,你不能将null放置到这种队列中)     下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最"紧急"的任务从队列中取出来,然后运行它: ? 1 2 3 4 5 6 7 8 9 10 11 12

Java 并发工具包 java.util.concurrent 用户指南

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html. 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友可以去 Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf[带书签] 进行下载. 1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平

【JAVA秒会技术之多线程】Java 并发工具包 java.util.concurrent 用户指南

1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包.这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类.在这个包被添加以前,你需要自己去动手实现自己的相关工具类.本文我将带你一一认识 java.util.concurrent 包里的这些类,然后你可以尝试着如何在项目中使用它们.本文中我将使用 Java 6 版本,我不确定这和 Java 5 版本里的是否有一些差异.

Java并发集合的实现原理

本文简要介绍Java并发编程方面常用的类和集合,并介绍下其实现原理. AtomicInteger 可以用原子方式更新int值.类 AtomicBoolean.AtomicInteger.AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新.基本的原理都是使用CAS操作: boolean compareAndSet(expectedValue, updateValue); 如果此方法(在不同的类间参数类型也不同)当前保持expectedValue,

如何使用Contemplate ThreadSafe发现并判断Java并发问题

事实证明,要发挥多核硬件所带来的收益是很困难和有风险的.当使用并发正确和安全地编写Java软件时,我们需要很仔细地进行思考.因为错误使用并发会导致偶尔才出现的缺陷,这些缺陷甚至能够躲过最严格的测试环境. 静态分析工具提供了一种方式,可以在代码执行之前探查并修正并发错误.它能够在代码执行之前分析程序的源码或编译形成的字节码,进而发现隐藏在代码之中的缺陷. Contemplate的ThreadSafe Solo是一个商用的Eclipse静态分析插件,其目的就是专门用来发现并诊断隐藏在Java程序之中

java.util.concurrent包(7)——Exchanger使用

Java 并发 API 提供了一种允许2个并发任务间相互交换数据的同步应用.更具体的说,Exchanger类允许在2个线程间定义同步点,当2个线程到达这个点,他们相互交换数据类型,使用第一个线程的数据类型变成第二个的,然后第二个线程的数据类型变成第一个的. 示例1 一个人有零食,另一个人有钱,他们两个想等价交换,对好口号在某个地方相见,一个人先到了之后,必须等另一个人带着需要的东西来了之后,才能开始交换. public class ExchangerTest {public static voi

Java并发编程相关面试问题

基础概念 1.什么是原子操作?在Java Concurrency API中有哪些原子类(atomic classes)? 原子操作(atomic operation)意为"不可被中断的一个或一系列操作" .处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作. 在Java中可以通过锁和循环CAS的方式来实现原子操作. CAS操作--Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作. 原子操作是

Java并发编程:从根源上解析volatile关键字的实现

Java并发编程:volatile关键字解析 1.解析概览 内存模型的相关概念 并发编程中的三个概念 Java内存模型 深入剖析volatile关键字 使用volatile关键字的场景 2.内存模型的相关概念 缓存一致性问题.通常称这种被多个线程访问的变量为共享变量. 也就是说,如果一个变量在多个CPU中都存在缓存(一般在多线程编程时才会出现),那么就可能存在缓存不一致的问题. 为了解决缓存不一致性问题,通常来说有以下2种解决方法: 通过在总线加LOCK#锁的方式 通过缓存一致性协议 这2种方式