zeromq_传说中最快的消息队列

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zeromq 中文简介:

http://blog.csdn.net/program_think/article/details/6687076

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

<?php

    /*

    *  Hello World server

    *  Binds REP socket to tcp://*:5555

    *  Expects "Hello" from client, replies with "World"

    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

    */

     

    $context = new ZMQContext(1);

     

    //  Socket to talk to clients

    $responder = new ZMQSocket($context, ZMQ::SOCKET_REP);

    $responder->bind("tcp://*:5555");

     

    while(true) {

        //  Wait for next request from client

        $request = $responder->recv();

        printf ("Received request: [%s]\n", $request);

     

        //  Do some 'work'

        sleep (1);

     

        //  Send reply back to client

        $responder->send("World");   

 

}

3 客户端:

(客户端发送消息)


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<?php

    /*

    *  Hello World client

    *  Connects REQ socket to tcp://localhost:5555

    *  Sends "Hello" to server, expects "World" back

    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

    */

     

    $context = new ZMQContext();

     

    //  Socket to talk to server

    echo "Connecting to hello world server…\n";

    $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);

    $requester->connect("tcp://localhost:5555");

     

    for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {

        printf ("Sending request %d…\n", $request_nbr);

        $requester->send("Hello");

         

        $reply = $requester->recv();

        printf ("Received reply %d: [%s]\n", $request_nbr, $reply);

 

}


1

 

天气气候订阅系统:(pub-sub)

1 server端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

<?php

    /*

    *  Weather update server

    *  Binds PUB socket to tcp://*:5556

    *  Publishes random weather updates

    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

    */

     

    //  Prepare our context and publisher

    $context = new ZMQContext();

    $publisher = $context->getSocket(ZMQ::SOCKET_PUB);

    $publisher->bind("tcp://*:5556");

    $publisher->bind("ipc://weather.ipc");

     

    while (true) {

        //  Get values that will fool the boss

        $zipcode     = mt_rand(0, 100000);

        $temperature = mt_rand(-80, 135);

        $relhumidity = mt_rand(10, 60);

     

        //  Send message to all subscribers

        $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);

        $publisher->send($update);

    }

2 client端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<?php

    /*

    *  Weather update client

    *  Connects SUB socket to tcp://localhost:5556

    *  Collects weather updates and finds avg temp in zipcode

    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

    */

     

    $context = new ZMQContext();

     

    //  Socket to talk to server

    echo "Collecting updates from weather server…", PHP_EOL;

    $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);

    $subscriber->connect("tcp://localhost:5556");

     

    //  Subscribe to zipcode, default is NYC, 10001

    $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";

    $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);

     

    //  Process 100 updates

    $total_temp = 0;

    for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {

        $string = $subscriber->recv();

        sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);

        $total_temp += $temperature;

    }

    printf ("Average temperature for zipcode '%s' was %dF\n",

        $filter, (int) ($total_temp / $update_nbr));


1

------------------------


1

pub-sub的proxy模式:


1

图示是:

Proxy节点的代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

<?php

    /*

    *  Weather proxy device

    * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>

    */

     

    $context = new ZMQContext();

     

    //  This is where the weather server sits

    $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);

    $frontend->connect("tcp://192.168.55.210:5556");

     

    //  This is our public endpoint for subscribers

    $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);

    $backend->bind("tcp://10.1.1.0:8100");

     

    //  Subscribe on everything

    $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");

     

    //  Shunt messages out to our own subscribers

    while(true) {

        while(true) {

            //  Process all parts of the message

            $message = $frontend->recv();

            $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);

            $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);

            if(!$more) {

                break; // Last message part

            }

        }

 

}

其实就是proxy同时是作为pub又作为sub的

----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

时间: 2025-01-19 13:30:42

zeromq_传说中最快的消息队列的相关文章

ActiveMQ消息队列

什么是MQ? 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术(如:WebService).排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. 为什么要用MQ? 1.调用异步化,提高服务器性能 在不使用消息队列的情况下,用户的请求数据直接写入数据库,

大型网站架构系列:消息队列(二) (转)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

Kafka与常见消息队列的对比

Kafka与常见消息队列的对比 RabbitMQ Erlang编写 支持很多的协议:AMQP,XMPP, SMTP, STOMP 非常重量级,更适合于企业级的开发 发送给客户端时先在中心队列排队.对路由,负载均衡或者数据持久化都有很好的支持. Redis 基于Key-Value对的NoSQL数据库 入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受: 出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能

消息队列和管道的区别(转载)

转载自:http://bbs.chinaunix.net/viewthread.php?tid=265266 作者:beginner-bj 请问管道和消息队列有什么不同  管道通信(PIPE) 管道通信方式的中间介质是文件,通常称这种文件为管道文件.两个进程利用管道文件进行通信时,一个 进程为写进程,另一个进程为读进程.写进程通过写端(发送端)往管道文件中写入信息:读进程通过读 端(接收端)从管道文件中读取信息.两个进程协调不断地进行写.读,便会构成双方通过管道传递信息 的流水线. 利用系统调用

当设计消息队列时我们关心什么

应用消息队列可以对系统进行解耦,流量削峰,在分布式系统设计中,消息队列是重要的组件之一. 在开发中应用过ActiveMQ,kafka等mq,不过对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现特别重要, 比如对一致性等实现的关注,可以帮助我们在开发中避免踩坑,规避问题的出现.这篇文章简单探讨下当设计和实现一个消息队列时,我们需要关心哪些地方.   消息队列功能和特性 一个传统意义上的消息队列,需要支持消息的发送,接受和消息暂存的功能. 在实际应用中,对消息队列的要求远不止于此,在不同

ENode 1.0 - 消息队列的设计思路

开源地址:https://github.com/tangxuehua/enode 上一篇文章,简单介绍了enode框架内部的整体实现思路,用到了staged event-driven architecture的思想.通过前一篇文章,我们知道了enode内部有两种队列:command queue.event queue:用户发送的command会进入command queue排队,domain model产生的domain event会进入event queue,然后等待被dispatch到所有的

消息队列入门(一)关于消息队列

1.什么是消息队列 消息是指在两个独立的系统间传递的数据,这两个系统可以是两台计算机,也可以是两个进程. 消息可以非常简单,可以是简单的字符串,也可以是保存了数据持久化的各种类型的文档集合. 队列是在消息的传输过程中的通道,是保存消息的容器,根据不同的情形,可以有先进先出,优先级队列等区别 . 2.为什么使用消息队列 个人觉得消息队列主要的意义是解耦和异步处理,以及在高并发场景下平滑短时间内大量的服务请求. 消息队列不仅被用于系统内部组件之间的通信,同时也被用于系统跟其它服务之间的交互. 消息队

消息队列(Message Queue)简介及其使用

消息队列(Message Queue)简介及其使用 利用 MSMQ(Microsoft Message Queue),应用程序开发人员可以通过发送和接收消息方便地与应用程序进行快速可靠的通信.消息处理为您提供了有保障的消息传递和执行许多业务处理的可靠的防故障方法. MSMQ与XML Web Services和.Net Remoting一样,是一种分布式开发技术.但是在使用XML Web Services或.Net Remoting组件时,Client端需要和Server端实时交换信息,Serve

C#分布式消息队列 EQueue 2.0 发布啦

前言 最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式.到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果. EQueue开源地址:https://github.com/tangxuehua/equeue EQueue相关文档:http://www.cnblogs.com/netfocus/category/598000.html EQueue Nuget地址:http://www.nuget.org/packages/eq