基本线程同步(八)在Lock中使用多个条件

在Lock中使用多个条件

一个锁可能伴随着多个条件。这些条件声明在Condition接口中。 这些条件的目的是允许线程拥有锁的控制并且检查条件是否为true,如果是false,那么线程将被阻塞,直到其他线程唤醒它们。Condition接口提供一种机制,阻塞一个线程和唤醒一个被阻塞的线程。

在并发编程中,生产者与消费者是经典的问题。我们有一个数据缓冲区,一个或多个数据生产者往缓冲区存储数据,一个或多个数据消费者从缓冲区中取出数据,正如在这一章中前面所解释的一样。

在这个指南中,你将学习如何通过使用锁和条件来实现生产者与消费者问题。

准备工作…

你应该事先阅读使用Lock同步代码的指南,才能更好的理解这个食谱。

如何做…

按以下步骤来实现的这个例子:

1.首先,让我们创建一个类来模拟文本文件。创建FileMock类,包括两个属性:一个字符串数组类型,名叫content,另一个int类型,名叫index。它们将存储文件内容和被检索到的模拟文件的行数。

1 public class FileMock {
2 private String content[];
3 private int index;

2.实现FileMock类的构造器,用随机字符初始化文件的内容。

01 public FileMock(int size, int length){
02 content=new String[size];
03 for (int i=0; i<size; i++){
04 StringBuilder buffer=new StringBuilder(length);
05 for (int j=0; j<length; j++){
06 int indice=(int)Math.random()*255;
07 buffer.append((char)indice);
08 }
09 content[i]=buffer.toString();
10 }
11 index=0;
12 }

3.实现hasMoreLines()方法,如果文件有更多的行来处理,则返回true,如果我们已经取到了模拟文件的尾部,则返回false。

1 public boolean hasMoreLines(){
2 return index<content.length;
3 }

4.实现getLine()方法,返回index属性所确定的行数并增加其值。

1 public String getLine(){
2 if (this.hasMoreLines()) {
3 System.out.println("Mock: "+(content.length-index));
4 return content[index++];
5 }
6 return null;
7 }

5.现在,实现Buffer类,用来实现在生产者与消费者之间共享的缓冲区。

1 public class Buffer {

6.Buffer类,有6个属性:

  • 一个类型为LinkedList<String>,名为buffer的属性,用来存储共享数据
  • 一个类型为int,名为maxSize的属性,用来存储缓冲区的长度
  • 一个名为lock的ReentrantLock对象,用来控制修改缓冲区代码块的访问
  • 两个名分别为lines和space,类型为Condition的属性
  • 一个Boolean类型,名为pendingLines的属性,表明如果缓冲区中有行
1 private LinkedList<String> buffer;
2 private int maxSize;
3 private ReentrantLock lock;
4 private Condition lines;
5 private Condition space;
6 private boolean pendingLines;

7.实现Buffer类的构造器,初始化前面描述的所有属性。

1 public Buffer(int maxSize) {
2 this.maxSize=maxSize;
3 buffer=new LinkedList<>();
4 lock=new ReentrantLock();
5 lines=lock.newCondition();
6 space=lock.newCondition();
7 pendingLines=true;
8 }

8. 实现insert()方法,接收一个String类型参数并试图将它存储到缓冲区。首先,它获得锁的控制。当它有锁的控制,它将检查缓冲区是否有空闲空 间。如果缓冲区已满,它将调用await()方法在space条件上等待释放空间。如果其他线程在space条件上调用signal()或 signalAll()方法,这个线程将被唤醒。当这种情况发生,这个线程在缓冲区上存储行并且在lines条件上调用signallAll()方法,稍 后我们将会看到,这个条件将会唤醒所有在缓冲行上等待的线程。

01 public void insert(String line) {
02 lock.lock();
03 try {
04 while (buffer.size() == maxSize) {
05 space.await();
06 }
07 buffer.offer(line);
08 System.out.printf("%s: Inserted Line: %d\n", Thread.
09 currentThread().getName(),buffer.size());
10 lines.signalAll();
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 } finally {
14 lock.unlock();
15 }
16 }

9. 实现get()方法,它返回存储在缓冲区上的第一个字符串。首先,它获取锁的控制。当它拥有锁的控制,它检查缓冲区是否有行。如果缓冲区是空的,它调用 await()方法在lines条件上等待缓冲区中的行。如果其他线程在lines条件上调用signal()或signalAll()方法,这个线程将 被唤醒。当它发生时,这个方法获取缓冲区的首行,并且在space条件上调用signalAll()方法,然后返回String。

01 public String get() {
02 String line=null;
03 lock.lock();
04 try {
05 while ((buffer.size() == 0) &&(hasPendingLines())) {
06 lines.await();
07 }
08 if (hasPendingLines()) {
09 line = buffer.poll();
10 System.out.printf("%s: Line Readed: %d\n",Thread.
11 currentThread().getName(),buffer.size());
12  
13 space.signalAll();
14 }
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 } finally {
18 lock.unlock();
19 }
20 return line;
21 }

10.实现setPendingLines()方法,用来设置pendingLines的值。当没有更多的行生产时,它将被生产者调用。

1 public void setPendingLines(boolean pendingLines) {
2 this.pendingLines=pendingLines;
3 }

11.实现hasPendingLines()方法,如果有更多的行被处理时,返回true,否则返回false。

1 public boolean hasPendingLines() {
2 return pendingLines || buffer.size()>0;
3 }

12.现在轮到生产者,实现Producer类,并指定其实现Runnable接口。

1 public class Producer implements Runnable {

13.声明两个属性:一个FileMock类对象,另一个Buffer类对象。

1 private FileMock mock;
2 private Buffer buffer;

14.实现Producer类的构造器,初始化这两个属性。

1 public Producer (FileMock mock, Buffer buffer){
2 this.mock=mock;
3 this.buffer=buffer;
4 }

15.实现run()方法,读取在FileMock对象中创建的所有行,并使用insert()方法将它们存储到缓冲区。一旦这个过程结束,使用setPendingLines()方法警告缓冲区,不会再产生更多的行。

1 @Override
2 public void run() {
3 buffer.setPendingLines(true);
4 while (mock.hasMoreLines()){
5 String line=mock.getLine();
6 buffer.insert(line);
7 }
8 buffer.setPendingLines(false);
9 }

16.接下来轮到消费者,实现Consumer类,并指定它实现Runnable接口。

1 public class Consumer implements Runnable {

17.声明Buffer对象,实现Consumer构造器来初始化这个对象。

1 private Buffer buffer;
2 public Consumer (Buffer buffer) {
3 this.buffer=buffer;
4 }

18.实现run()方法,当缓冲区有等待的行,它将获取一个并处理它。

1 @Override
2 public void run() {
3 while (buffer.hasPendingLines()) {
4 String line=buffer.get();
5 processLine(line);
6 }
7 }

19.实现辅助方法processLine(),它只睡眠10毫秒,用来模拟某种行的处理。

1 private void processLine(String line) {
2 try {
3 Random random=new Random();
4 Thread.sleep(random.nextInt(100));
5 } catch (InterruptedException e) {
6 e.printStackTrace();
7 }
8 }

20.通过创建类名为Main,且包括main()方法来实现这个示例的主类。

1 public class Main {
2 public static void main(String[] args) {

21.创建一个FileMock对象。

1 FileMock mock=new FileMock(100, 10);

22.创建一个Buffer对象。

1 Buffer buffer=new Buffer(20);

23.创建Producer对象,并且用10个线程运行它。

1 Producer producer=new Producer(mock, buffer);
2 Thread threadProducer=new Thread(producer,"Producer");

24.创建Consumer对象,并且用10个线程运行它。

1 Consumer consumers[]=new Consumer[3];
2 Thread threadConsumers[]=new Thread[3];
3 for (int i=0; i<3; i++){
4 consumers[i]=new Consumer(buffer);
5 threadConsumers[i]=new Thread(consumers[i],"Consumer "+i);
6 }

25.启动producer和3个consumers。

1 threadProducer.start();
2 for (int i=0; i<3; i++){
3 threadConsumers[i].start();
4 }

它是如何工作的…

所 有Condition对象都与锁有关,并且使用声明在Lock接口中的newCondition()方法来创建。使用condition做任何操作之前, 你必须获取与这个condition相关的锁的控制。所以,condition的操作一定是在以调用Lock对象的lock()方法为开头,以调用相同 Lock对象的unlock()方法为结尾的代码块中。

当一个线程在一个condition上调用await()方法时,它将自动释放锁的控制,所以其他线程可以获取这个锁的控制并开始执行相同操作,或者由同个锁保护的其他临界区。

注释:当一个线程在一个condition上调用signal()或signallAll()方法,一个或者全部在这个condition上等待的线程将被唤醒。这并不能保证的使它们现在睡眠的条件现在是true,所以你必须在while循环内部调用await()方法。你不能离开这个循环,直到 condition为true。当condition为false,你必须再次调用 await()方法。

你必须十分小心 ,在使用await()和signal()方法时。如果你在condition上调用await()方法而却没有在这个condition上调用signal()方法,这个线程将永远睡眠下去。

在调用await()方法后,一个线程可以被中断的,所以当它正在睡眠时,你必须处理InterruptedException异常。

不止这些…

Condition接口提供不同版本的await()方法,如下:

  • await(long time, TimeUnit unit):这个线程将会一直睡眠直到:

(1)它被中断

(2)其他线程在这个condition上调用singal()或signalAll()方法

(3)指定的时间已经过了

(4)TimeUnit类是一个枚举类型如下的常量:

DAYS,HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS,SECONDS

  • awaitUninterruptibly():这个线程将不会被中断,一直睡眠直到其他线程调用signal()或signalAll()方法
  • awaitUntil(Date date):这个线程将会一直睡眠直到:

(1)它被中断

(2)其他线程在这个condition上调用singal()或signalAll()方法

(3)指定的日期已经到了

你可以在一个读/写锁中的ReadLock和WriteLock上使用conditions。

时间: 2024-11-05 12:17:18

基本线程同步(八)在Lock中使用多个条件的相关文章

线程同步:System.Core中新的读写锁

读写锁是进程同步中经常使用的锁. 在System.Core中ReaderWriterLockSlim类比较好用,它是基于写优先策略的.它还支持从读锁升级到写锁,称为Upgradable Mode. 简单测试代码如下: private static void Test() { ReaderWriterLockSlim locker = new ReaderWriterLockSlim(); ParameterizedThreadStart reader = o => { var innerlock

UNIX环境高级编程:线程同步之互斥锁、读写锁和条件变量

一.使用互斥锁 1.初始化互斥量 pthread_mutex_t mutex =PTHREAD_MUTEX_INITIALIZER;//静态初始化互斥量 int pthread_mutex_init(pthread_mutex_t*mutex,pthread_mutexattr_t*attr);//动态初始化互斥量 int pthread_mutex_destory(pthread_mutex_t*mutex);//撤销互斥量 不能拷贝互斥量变量,但可以拷贝指向互斥量的指针,这样就可以使多个函数

c#.net多线程编程教学——线程同步_C#教程

随着对多线程学习的深入,你可能觉得需要了解一些有关线程共享资源的问题. .NET framework提供了很多的类和数据类型来控制对共享资源的访问. 考虑一种我们经常遇到的情况:有一些全局变量和共享的类变量,我们需要从不同的线程来更新它们,可以通过使用System.Threading.Interlocked类完成这样的任务,它提供了原子的,非模块化的整数更新操作. 还有你可以使用System.Threading.Monitor类锁定对象的方法的一段代码,使其暂时不能被别的线程访问. System

基本线程同步(一)引言

声明:本文是< Java 7 Concurrency Cookbook >的第二章,作者: Javier Fernández González     译者:许巧辉 校对:方腾飞 引言 在这个章节中,我们将覆盖: 同步方法 在同步的类里安排独立属性 在同步代码中使用条件 使用Lock同步代码块 使用读/写锁同步数据访问  修改Lock的公平性  在Lock中使用多个条件 介绍 在并发编程中发生的最常见的一种情况是超过一个执行线程使用共享资源.在并发应用程序中,多个线程读或写相同的数据或访问同一

UNIX环境高级编程:线程同步之条件变量及属性

条件变量变量也是出自POSIX线程标准,另一种线程同步机制.主要用来等待某个条件的发生.可以用来同步同一进程中的各个线程.当然如果一个条件变量存放在多个进程共享的某个内存区中,那么还可以通过条件变量来进行进程间的同步. 每个条件变量总是和一个互斥量相关联,条件本身是由互斥量保护的,线程在改变条件状态之间必须要锁住互斥量.条件变量相对于互斥量最大的优点在于允许线程以无竞争的方式等待条件的发生.当一个线程获得互斥锁后,发现自己需要等待某个条件变为真,如果是这样,该线程就可以等待在某个条件上,这样就不

多线程:C#线程同步lock,Monitor,Mutex,同步事件和等待句柄(中)

转自 http://www.cnblogs.com/freshman0216/archive/2008/07/30/1252345.html   本篇继续介绍WaitHandler类及其子类Mutex,ManualResetEvent,AutoResetEvent的用法..NET中线程同步的方式多的让人看了眼花缭乱,究竟该怎么去理解呢?其实,我们抛开.NET环境看线程同步,无非是执行两种操作:一是互斥/加锁,目的是保证临界区代码操作的"原子性":另一种是信号灯操作,目的是保证多个线程按

基本线程同步(七)修改Lock的公平性

修改Lock的公平性 在ReentrantLock类和 ReentrantReadWriteLock类的构造器中,允许一个名为fair的boolean类型参数,它允许你来控制这些类的行为.默认值为 false,这将启用非公平模式.在这个模式中,当有多个线程正在等待一把锁(ReentrantLock或者 ReentrantReadWriteLock),这个锁必须选择它们中间的一个来获得进入临界区,选择任意一个是没有任何标准的.true值将开启公平 模式.在这个模式中,当有多个线程正在等待一把锁(R

C#线程同步——lock,Monitor,Mutex(摘录)

线程:线程是进程的独立执行单元,每一个进程都有一个主线程,除了主线程可以包含其他的线程. 多线程的意义:多线程有助于改善程序的总体响应性,提高CPU的效率.       多线程的应用程序域是相当不稳定的,因为多个线程在同一时间内都能运行共享的功能模块.为了保护应用程序的资源不被破坏,为多线程程序提供了三种加锁的机制,分别是:Monitor类.Lock关键字和Mutex类.      1. lock        lock实现的功能是:使后进入的线程不会中断当前的线程,而是等待当前线程结束后再继续

银行取款[多线程]{使用重入锁Lock接口ReentrantLock锁确保线程同步}

经典例子:老婆(朱丽叶)老公(罗密欧),使用银行卡和存折,或者网银等,同时对同一账户操作的安全问题.  此处用多线程实现,同时取款的模拟实现,使用使用Lock接口ReentrantLock锁确保线程同步,查看取款安全隐患问题,代码如下: -----------------------------------------------------------------------------------------------------------------------------------