生产者和消费者是多线程经典的问题,生产者和消费者问题的核心是同步的问题,同步问题的核心是要保证同一个资源被多个线程并发访问时的完整性,常用的方法是采用信号或加锁机制,保证资源在任一时刻只能被一个线程访问。这一问题用java来实现的话主要有4种方式。1.wait()/notify();2.await()/signal(); 3.blockingQuene 4.PipedInputStream/pipedOutputStream
下面分别来实现。
1.利用wait()和notify()来实现
Wait()方法:当缓冲区已空/满时,生产者/消费者停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
Notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
下面看看代码实现:
首先定义商店类:
package ConsumerAndProducerProblem; import java.util.LinkedList; /** * @author: zhuwei * @ClassName: 商店类 * @Description: TODO * @Date: 下午3:58:01 */ public class Storage { //定义仓库最大容量100 private final int MAX_SIZE = 100; private LinkedList<Object> list= new LinkedList<>(); //生产num个商品 public void produce(int num) throws Exception { synchronized(list) { //假如仓库容量不足 if(list.size()+num>MAX_SIZE) { System.out.println("仓储容量不足"); //线程等待 list.wait(); } //仓库容量可以容量生产者的生产,则生产 for(int i = 0;i < num;i++) { list.add(new Object()); } System.out.println("生产者生产产品数量为:"+ num); list.notifyAll(); } } //消费num个商品 public void consume(int num) throws Exception { synchronized(list) { //加入仓库中的商品不能满足消费者的需求,线程等待 if(list.size() < num) { System.out.println("仓库中的商品不能满足消费者需求"); list.wait(); } for(int i = 0;i < num;i++) { list.remove(); } System.out.println("消费者消费商品数量为:"+num); list.notifyAll(); } } }
定义生产者类
package ConsumerAndProducerProblem; /** * @author: zhuwei * @ClassName: 生产者线程 * @Description: TODO * @Date: 下午3:57:15 */ public class Consumer implements Runnable { //消费商品数量 private int number; private Storage storage; public void consume(int num) { try { storage.consume(num); } catch (Exception e) { // TODO Auto-generatedcatch block e.printStackTrace(); } } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public Storage getStorage() { return storage; } public void setStorage(Storage storage) { this.storage = storage; } @Override public void run() { // TODO Auto-generatedmethod stub consume(number); } }
定义消费者类:
package ConsumerAndProducerProblem; /** * @author: zhuwei * @ClassName: 消费者线程 * @Description: TODO * @Date: 下午3:57:38 */ public class Producer implements Runnable { //生产的商品数量 private int number; private Storage storage; public void produce(int num) { try { storage.produce(num); } catch (Exception e) { // TODO Auto-generatedcatch block e.printStackTrace(); } } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public Storage getStorage() { return storage; } public void setStorage(Storage storage) { this.storage = storage; } @Override public void run() { // TODO Auto-generatedmethod stub produce(number); } }
创建测试类:
package ConsumerAndProducerProblem; public class Test { public static void main(String[] args) { // TODO Auto-generatedmethod stub //仓库对象 Storage storage = new Storage(); //消费者对象 Consumer c1 = new Consumer(); c1.setNumber(10); c1.setStorage(storage); Consumer c2 = new Consumer(); c2.setNumber(80); c2.setStorage(storage); //生产者对象 Producer p1 = new Producer(); p1.setNumber(20); p1.setStorage(storage); Producer p2 = new Producer(); p2.setNumber(50); p2.setStorage(storage); p1.run(); c1.run(); p2.run(); c2.run(); } }
2.await()和signal()方法
该方法中用到的几个类做一下说明:
ReentrantLock():一个可重入的互斥锁Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。 ReentrantLock 将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用lock
的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。可以使用 isHeldByCurrentThread()和
getHoldCount()方法来检查此情况是否发生。
Condition():将 Object监视器方法(wait、notify和
notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock替代了
synchronized 方法和语句的使用,Condition替代了 Object 监视器方法的使用。
条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式释放相关的锁,并挂起当前线程,就像 Object.wait 做的那样。
Condition实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其 newCondition()方法。
定义仓库的代码为:
package ConsumerAndProducerProblem2; import java.util.LinkedList; importjava.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; importjava.util.concurrent.locks.ReentrantLock; public class Storage { //定义仓库容量 privatefinal int MAX_SIZE = 100; // privateLinkedList<Object> list = new LinkedList<>(); //定义锁 privateLock lock = new ReentrantLock(); privateCondition full = lock.newCondition(); privateCondition empty = lock.newCondition(); publicint getMAX_SIZE() { returnMAX_SIZE; } //生产商品 publicvoid produce(int number) throws Exception { //获得锁 lock.lock(); { //加入仓库容量不能容纳生产者生产的商品,线程阻塞 while(list.size()+number> MAX_SIZE) { System.out.println("仓库空间无法容量生产的商品"); full.await(); } for(inti = 0;i < number;i++) list.add(newObject()); System.out.println("生产者生产商品数量:"+number); full.notifyAll(); empty.notifyAll(); //释放锁 lock.unlock(); } } //消费商品 publicvoid consume(int number) throws Exception { //获得锁 lock.lock(); //加入仓库的商品不能满足消费者消费需求 while(list.size()< number) { System.out.println("仓库中的商品不能满足消费者需求"); empty.wait(); } for(inti = 0;i<number;i++) { list.remove(); } System.out.println("消费者消费产品数量为:"+number); full.notifyAll(); empty.notifyAll(); //释放锁 lock.unlock(); } }
生产者、消费者和测试类的代码保存不变