java并发包消息队列

消息队列常用于有生产者和消费者两类角色的多线程同步场景

 

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。

主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

         插入:

                   1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常

        2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

        3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.

         读取:

        4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

        5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

         其他

int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准

boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改    变了返回true

public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true

int drainTo(Collection<? super E> c); 传入的集合中的元素,如果在队列中存在,那么将     队列中的元素移动到集合中

int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,制定了移   动的数量

案例:


package blockingqueue;

 

import java.util.concurrent.BlockingQueue;

 

public class Consumer implements Runnable {

    BlockingQueue<String> queue;

   

    public Consumer(BlockingQueue<String> queue) {

      this.queue = queue;

   }

   

   @Override

   public void run() {

      try {

         String consumer = Thread.currentThread().getName();

         System.out.println(consumer);

         //如果队列为空,会阻塞当前线程

         String temp = queue.take();

         System.out.println(consumer + "消费者  get a product:" + temp);

      } catch (Exception e) {

         e.printStackTrace();

      }

   }

 

}


package blockingqueue;

 

import java.util.concurrent.BlockingQueue;

 

public class Producer implements Runnable {

   BlockingQueue<String> queue;   

    public Producer(BlockingQueue<String> queue) { 

        this.queue = queue; 

    }   

    @Override 

    public void run() { 

        try

            String temp = "A Product, 生产线程:" 

                    + Thread.currentThread().getName(); 

            queue.put(temp);//如果队列是满的话,会阻塞当前线程 

            System.out.println("生产者 I have made a product: " 

                 + Thread.currentThread().getName());

        } catch (InterruptedException e) { 

            e.printStackTrace(); 

        } 

    }

}


package blockingqueue;

 

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

 

public class Test {

 

   public static void main(String[] args) throws Exception {

      BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

      // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

      // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE

      // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

      Consumer consumer = new Consumer(queue);

      Producer producer = new Producer(queue);

      for (int i = 0; i < 3; i++) {

         new Thread(producer, "Producer" + (i + 1)).start();

      }

      for (int i = 0; i < 5; i++) {

         new Thread(consumer, "Consumer" + (i + 1)).start();

      }

     

      Thread.sleep(5000);

     

      new Thread(producer, "Producer" + (5)).start();

   }

}

 

BlockingQueue有四个具体的实现类,常用的两种实现类为:

 

1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

 

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

         LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

 

LinkedBlockingQueue和ArrayBlockingQueue区别:

 

LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

 

生产者消费者的示例代码:

见代码  TestBlockingQueue  TestBlockingQueueConsumer   TestBlockingQueueProducer


package blockingqueue;

 

import java.util.Random;

import java.util.concurrent.BlockingQueue;

 

public class TestBlockingQueueProducer implements Runnable {

   BlockingQueue<String> queue;

   Random random = new Random();

 

   public TestBlockingQueueProducer(BlockingQueue<String> queue) {

      this.queue = queue;

   }

 

   @Override

   public void run() {

 

      for (int i = 0; i < 10; i++) {

         try {

            Thread.sleep(random.nextInt(10));

            String task = Thread.currentThread().getName() + " made a product " + i;

 

            System.out.println(task);

            queue.put(task);  //阻塞方法

         } catch (InterruptedException e) {

             

            e.printStackTrace();

         }

 

      }

   }

}


package blockingqueue;

 

import java.util.Random;

import java.util.concurrent.BlockingQueue;

 

public class TestBlockingQueueConsumer implements Runnable {

   BlockingQueue<String> queue;

    Random random = new Random();

   

    public TestBlockingQueueConsumer(BlockingQueue<String> queue){ 

        this.queue = queue; 

    }       

    @Override 

    public void run() { 

        try { 

          Thread.sleep(random.nextInt(10));

          System.out.println(Thread.currentThread().getName()+ "trying...");

            String temp = queue.take();//如果队列为空,会阻塞当前线程 

            int remainingCapacity = queue.remainingCapacity();

            System.out.println(Thread.currentThread().getName() + " get a job " +temp);

            // System.out.println("队列中的元素个数: "+ remainingCapacity);

        } catch (InterruptedException e) { 

            e.printStackTrace();

        } 

    }

}


package blockingqueue;

 

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

 

public class TestBlockingQueue {

 

   public static void main(String[] args) {

      BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);

      // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

      // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE

      // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);

      TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue);

      TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue);

      for (int i = 0; i < 3; i++) {

         new Thread(producer, "Producer" + (i + 1)).start();

      }

      for (int i = 0; i < 5; i++) {

         new Thread(consumer, "Consumer" + (i + 1)).start();

      }

   }

}

 

时间: 2024-07-30 14:52:50

java并发包消息队列的相关文章

RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)

版权声明:本文为博主原创文章,转载注明出处http://blog.csdn.net/u013142781 目录(?)[+] 一.消息队列使用场景或者其好处 消息队列一般是在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量. 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息队列在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一接口.这允许你独立的扩展或修改两边的处理过

java消息队列-将一个对象加入一个消息队列有什么用

问题描述 将一个对象加入一个消息队列有什么用 将一个对象加入一个消息队列有什么用 一个消息队列本身是一个什么类的对象啊 有什么特点 解决方案 放在那里等待处理,队列你可以理解为一个列表

消息队列入门(二)消息队列的规范和开源实现

1.AMQP规范 AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口. 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,然后填上内容.路由KEY,最后发

使用 PHP 消息队列实现 Android 与 Web 通信

需求描述很简单:Android 发送数据到 Web 网页上. 系统: Ubuntu 14.04 + apache2 + php5 + Android 4.4 思路是 socket + 消息队列 + 服务器发送事件,下面的讲解步骤为 Android 端,服务器端,前端.重点是在于 PHP 进程间通信. Android 端比较直接,就是一个 socket 程序.需要注意的是,如果直接在活动主线程里面创建 socket 会报一个 android.os.NetworkOnMainThreadExcept

PHP memcache实现消息队列实例

现在memcache在服务器缓存应用比较广泛,下面我来介绍memcache实现消息队列等待的一个例子,有需要了解的朋友可参考. memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志.然后通过定时程序将内容落地到文件或者数据库. php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列. 方便实现队列的轻量级队列服务器是: starling支持memcache协议的轻量级持久化服务器 https://github.co

消息队列工具类(MSMQ)

所要做的是简化msmq的调用代码以及做到可替代性,实现后,调用消息队列代 码变为如下所示: QueueService srv = QueueService.Instance(); //检查存储DTO1的队列是否存在,如不存在则自动建立 srv.Prepare<DTO1>(); //发送类型为DTO1的消息 srv.Send<DTO1>(new DTO1() { p1="1", p2="2" }); //发送类型为DTO1的消息,并且将发送的消

UNIX环境高级编程:system V消息队列

unix早期通信机制中的信号能够传送的信息量有限,管道则只能传送无格式字节流,这远远是不够的. 消息队列(也叫报文队列)客服了这些缺点: 消息队列就是一个消息的链表. 可以把消息看作一个记录,具有特定的格式. 进程可以按照一定的规则向消息队列中添加新消息:另一些进程可以从消息队列中读走消息. 消息队列是随内核持续的,只有内核重启或人工删除时,该消息队列才会被删除. system V消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个给定队列放置一个消息,具有足够特权的任何进程都可以

当设计消息队列时我们关心什么

应用消息队列可以对系统进行解耦,流量削峰,在分布式系统设计中,消息队列是重要的组件之一. 在开发中应用过ActiveMQ,kafka等mq,不过对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现特别重要, 比如对一致性等实现的关注,可以帮助我们在开发中避免踩坑,规避问题的出现.这篇文章简单探讨下当设计和实现一个消息队列时,我们需要关心哪些地方.   消息队列功能和特性 一个传统意义上的消息队列,需要支持消息的发送,接受和消息暂存的功能. 在实际应用中,对消息队列的要求远不止于此,在不同

基于条件变量的消息队列

     条件变量是线程之前同步的另一种机制.条件变量给多线程提供了一种会和的场所.当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生.这样大大减少了锁竞争引起的线程调度和线程等待.      消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错.博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲.互斥锁和条件变量的消息队列:这个大概也参考了一下java的blo