nanomsg通信库的pubsub及survey

nanomsg实验——pubsub

发布订阅模式是很多消息中间件提供的常见功能。通过消息机制,能够将消息发布者和消息接收(消费)者
进行解耦。pubsub模式也是nanomsg直接支持的一直消息模型之一,因此通过pubsub模式实验,
同时也大致了解了下nanomsg的基础用法。

服务端

 代码如下 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

void usage(const char *name)
{
    fprintf(stderr, "%s [ bind url]n", name);
}

int main(int argc, char **argv)
{
    if(argc != 2) {
        usage(argv[0]);
        exit(-1);
    }

    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_PUB);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind failed: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        time_t rawtime;
        struct tm * timeinfo;

        time (&rawtime);
        timeinfo = localtime (&rawtime);
        char *text = asctime (timeinfo);
        int textLen = strlen(text);
        text[textLen - 1] = '';

        printf ("SERVER: PUBLISHING DATE %sn", text);
        nn_send(sock, text, textLen, 0);
        sleep(1);
    }

    return 0;
}

nanomsg使用非常简单,只要直接include nanomsg/nn.h,即可使用基本API。使用内置的通信模式,
需要引入对应的头文件,如pubsub模式,引入nonomsg/pubsub.h即可。

pubsub server,需要首先通过nn_socket调用创建socket,这里模仿了POSIX接口,
函数返回一个文件描述符。因此直接通过判断返回值是否大于0,判断是否创建成功。注意第二个参数为协议,
在协议相关头文件中会定义对应的宏。然后所有操作都将基于这个文件描述符。
和berkeley sockets一样,server需要bind一个端口,nanomsg需要bind一个url。目前nanomsg支持的格式有:
* 进程内通信(inproc):url格式为inproc://test
* 进程间同in想(ipc):url格式为ipc:///tmp/test.ipc
* tcp通信:url格式为tcp://*:5555

github上源码貌似已经支持websocket了。

nanomsg的错误和UNIX相同,失败之后会设置errno,可以通过nn_strerror获取对应的错误文本。

bind完了之后,就可以通过nn_send函数向socket发送消息了。这个函数参数和berkeley sockets api接口类似。
这里直接获取当前时间,然后发出给所有订阅者。

客户端

 代码如下 复制代码
#include <stdio.h>
#include <stdlib.h>

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>

int main(int argc, char **argv)
{
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME BIND_URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket (AF_SP, NN_SUB);
    if(sock < 0) {
        fprintf(stderr, "fail to create socket: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        fprintf(stderr, "fail to set sorket opts: %sn", nn_strerror(errno));
        exit(-1);
    }

    if (nn_connect(sock, url) < 0) {
        fprintf(stderr, "fail to connect to %s : %sn", url, nn_strerror(errno));
        exit(-1);
    }

    while ( 1 ) {
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);
        printf ("CLIENT (%s): RECEIVED %sn", name, buf);
        nn_freemsg (buf);
    }

    nn_shutdown(sock, 0);

    return 0;
}

客户端初始化和服务端差不多,在连接服务端之前,需要通过nn_setsockopt将当前socket设置成消息订阅者。
然后通过nn_connect连接发布者,参数和服务端bind的差不多,也是一个socket、一个url。
这里的url要和服务端bind的url相同。之后就是一个死循环不停的接收发布者的消息。

测试

首先是编译,和普通c程序相同,只是增加链接nanomsg。

gcc -o pubserver pubserver.c -lnanomsg
gcc -o pubclient pubclient.c -lnanomsg

为了方便测试,写了一个简单的shell脚本:
   

 代码如下 复制代码
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
PUB=$BASE/pubserver
SUB=$BASE/pubclient

URL="tcp://127.0.0.1:1234"

echo "start pubserver to bind tcp: $URL"

$PUB tcp://127.0.0.1:1234 &

echo "start to start pubclient"
for((i = 0; i < 10; i++))
    do
    echo "start client$i"
    $SUB client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

脚本很简单,首先启动一个消息发布者,然后每秒启动一个消息接受者。等待20s之后,kill掉所有子进程。

脚本的输出:

 代码如下 复制代码
start pubserver to bind tcp: tcp://127.0.0.1:1234
start to start pubclient
start client0
SERVER: PUBLISHING DATE Tue Feb 17 15:12:11 2015
start client1
SERVER: PUBLISHING DATE Tue Feb 17 15:12:12 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:12 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:12 2015
start client2
SERVER: PUBLISHING DATE Tue Feb 17 15:12:13 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:13 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:13 2015
start client3
SERVER: PUBLISHING DATE Tue Feb 17 15:12:14 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:14 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:14 2015
...
SERVER: PUBLISHING DATE Tue Feb 17 15:12:41 2015
CLIENT (client0): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client1): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client2): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client3): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client4): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client5): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client6): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client7): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client8): RECEIVED Tue Feb 17 15:12:41 2015
CLIENT (client9): RECEIVED Tue Feb 17 15:12:41 2015
kill all process and exit

可以看见每次启动一个新的订阅者,每个订阅者都能够收到发布者发布的当前时间。

nanomsg实验——survey

survey模式是由server发出询问,client针对请求回复响应的一种模式。这种模式在分布式系统中非常有用,
可以用来做服务发现、分布式事物等分布式询问。
客户端

客户端实现比较方便,除了基础调用(创建socket、连接url)之外,就是先接收服务端询问
(例子中比较简单,服务端询问是固定的,所以没有对内容进行检查)针对询问发送响应
(例子中是发送服务端当前时间)

 代码如下 复制代码
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

int main(int argc, const char **argv) {
    if(argc != 3) {
        fprintf(stderr, "usage: %s NAME URLn", argv[0]);
        exit(-1);
    }
    const char *name = argv[1];
    const char *url = argv[2];

    int sock = nn_socket(AF_SP, NN_RESPONDENT);
    if(sock < 0){
        fprintf(stderr, "nn_socket fail: %sn", nn_strerror(errno));
        exit(-1);
    }
    if(nn_connect(sock, url) < 0) {
        fprintf(stderr, "nn_connect fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1){
        char *buf = NULL;
        int bytes = nn_recv (sock, &buf, NN_MSG, 0);

        if(bytes > 0) {
            printf ("CLIENT (%s): RECEIVED "%s" SURVEY REQUESTn", name, buf);
            nn_freemsg (buf);

            char sendBuffer[128];
            time_t rawtime;
            struct tm * timeinfo;

            time (&rawtime);
            timeinfo = localtime (&rawtime);
            char *timeText = asctime (timeinfo);
            int textLen = strlen(timeText);
            timeText[textLen - 1] = '';
            sprintf(sendBuffer, "[ %s ] %s", name, timeText);
            int sendSize = strlen(sendBuffer) + 1;
            int actualSendSize = nn_send(sock, sendBuffer, sendSize, 0);

            if(actualSendSize != sendSize) {
                fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
                continue;
            }
        }
    }

    nn_shutdown(sock, 0);

    return 0;
}

这里收到消息后,就简单的打印,然后将响应数据写会给服务端。

服务端

服务端有个问题,之前搜索了几个例子都不太正常。经过尝试和简单查看代码之后发现,通过nanomsg基础api,
无法获取当前有多少客户端。但是,如果当前所有连接的客户端的响应都已经收到,再次调用nn_recv之后,
会直接返回-1,表示读取失败,同时errno(通过errno函数获取)被设置为EFSM,表示当前状态机状态不正确。

 代码如下 复制代码
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>

using namespace std;

const char *SURVEY_TYPE = "DATE";

int main(int argc, char** argv)
{

    if ( argc != 2 ) {
        fprintf(stderr, "usage: %s URLn", argv[0]);
        exit(-1);
    }
    const char *url = argv[1];
    int sock = nn_socket(AF_SP, NN_SURVEYOR);
    if(sock < 0) {
        fprintf (stderr, "nn_socket failed: %sn", nn_strerror (errno));
        exit(-1);
    }

    if(nn_bind(sock, url) < 0) {
        fprintf(stderr, "nn_bind fail: %sn", nn_strerror(errno));
        exit(-1);
    }

    while(1) {
        int sendSize = strlen(SURVEY_TYPE) + 1;
        int actualSendSize;
        printf ("SERVER: SENDING DATE SURVEY REQUESTn");
        if ((actualSendSize = nn_send(sock, SURVEY_TYPE, sendSize, 0)) != sendSize) {
            fprintf(stderr, "nn_send fail, expect length %d, actual length %dn", sendSize, actualSendSize);
            continue;
        }

        int count = 0;
        while(1) {
            char *buf = NULL;
            int bytes = nn_recv (sock, &buf, NN_MSG, 0);
            if (bytes < 0 && nn_errno() == ETIMEDOUT) break;
            if (bytes >= 0) {
                printf ("SERVER: RECEIVED "%s" SURVEY RESPONSEn", buf);
                ++count;
                nn_freemsg (buf);
            } else {
                fprintf(stderr, "nn_recv fail: %sn", nn_strerror(errno));
                break;
            }
        }
        printf("SERVER: current receive %d survey response.n", count);
        sleep(1);
    }

    nn_shutdown(sock, 0);

    return 0;

}

这里用了两个死循环,外层循环不停尝试向客户端发起询问。完成询问后,通过另外一个死循环读取所有的客户端响应,
当读取失败时退出循环。

之前找到的源码是直接判断错误是否ETIMEDOUT,经过打印会发现每次都没有超时,而是状态机错误:

 代码如下 复制代码
/*  If no survey is going on return EFSM error. */
if (nn_slow (!nn_surveyor_inprogress (surveyor)))
    return -EFSM;

测试

测试和前文差不多,先启动一个server,然后再一个个启动client:

 代码如下 复制代码
#!/bin/bash

BASE="$( cd "$( dirname "$0" )" && pwd )"
SERVER=$BASE/surveyserver
CLIENT=$BASE/surveyclient

URL="tcp://127.0.0.1:1234"

echo "start surveyserver to bind tcp: $URL"
$SERVER tcp://127.0.0.1:1234 &

echo "start to start surveyclient"
for((i = 0; i < 10; i++))
do
    echo "start client$i"
    $CLIENT client$i $URL &
    sleep 1
done

sleep 20
echo "kill all process and exit"

for pid in `jobs -p`
do
    echo "kill $pid"
    kill $pid
done

wait

输出为:

 代码如下 复制代码
start surveyserver to bind tcp: tcp://127.0.0.1:1234
start to start surveyclient
start client0
SERVER: SENDING DATE SURVEY REQUEST
start client1
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 0 survey response.
start client2
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:32:43 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 2 survey response.
start client3
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
...
SERVER: SENDING DATE SURVEY REQUEST
CLIENT (client0): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client1): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client2): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client3): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client4): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client5): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client6): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client7): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client9): RECEIVED "DATE" SURVEY REQUEST
CLIENT (client8): RECEIVED "DATE" SURVEY REQUEST
SERVER: RECEIVED "[ client0 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client1 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client2 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client3 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client4 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client5 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client6 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client7 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client9 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
SERVER: RECEIVED "[ client8 ] Tue Feb 17 23:33:09 2015" SURVEY RESPONSE
nn_recv fail: Operation cannot be performed in this state
SERVER: current receive 10 survey response.

从输出可以看见,每次最后一个接收完成之后,都会有一个“Operation cannot be performed in this state”
错误,也就是EFSM错误。

时间: 2024-09-11 05:46:23

nanomsg通信库的pubsub及survey的相关文章

android 开发 java写的 tcp 通信库,注册选择器时异常!跪求java高手指点!!!

问题描述 android 开发 java写的 tcp 通信库,注册选择器时异常!跪求java高手指点!!! public void initialize() throws IOException { boolean done = false; try { Log.e(TAG,"SocketChannel.open:IP:["+hostIp+"Port:"+hostListenningPort+"]."); // 打开监听信道并设置为非阻塞模式 s

通用Iframe跨域通信库实现

前言 前端在页面嵌入其他站点的页面时,常常会遇到跨域通信的问题,浏览器为了安全问题,限制了不同域名的JS直接调用. 解决方案 浏览器再限制跨域调用的同时,也预留了一个比较安全的通道.那就是message通道. 只要得到一个页面的window对象,都可以调用其postMessage的api,而不会有任何限制.而希望接受到其他页面message的页面只需要侦听这一消息即可.发送者代码示例 var parentWindow = window.parent; //假设这是一个放在iframe的页面,那么

Apache Qpid Proton 0.16.0,轻量通信库

Apache Qpid Proton 0.16.0 发布了.Qpid Proton 是一个 AMQP 1.0 通讯库.高性能,轻量级,应用广泛. 更新内容: 新特性和改进 PROTON-721 - [proton-j] expose ability to operate on Link capabilities PROTON-722 - [proton-j] expose ability to operate on Session capabilities and properties PROT

实现非阻塞套接字的一种简单方法 使用 JSSE 和 NIO 实现非阻塞通信的一种快速方法

简介: 尽管 SSL 阻塞操作――当读写数据的时候套接字的访问被阻塞――与对应的非阻塞方式相比提供了更好的 I/O 错误通知,但是非阻塞操作允许调用的线程继续运行.本文中,作者同时就客户端和服务器端描述了如何使用Java Secure Socket Extensions (JSSE) 和 Java NIO (新 I/O)库创建非阻塞的安全连接,并且介绍了创建非阻塞套接字的传统方法,以及使用JSSE 和 NIO 的一种可选的(必需的)方法. 阻塞,还是非阻塞?这就是问题所在.无论在程序员的头脑中多

android 开发activity如何向多个不同fragment 进行通信

问题描述 android 开发activity如何向多个不同fragment 进行通信 最近开发android ,需要一个activity 向不同的fragment 发送消息,知道的告诉小弟一声. 解决方案 setArguments方法 解决方案二: 可以用刘上的setArguments来设置,也可以用APP这个全局变量的类来专递数据,写一个APP类的单例模式,然后获取的值存到这里,在fragment里面调用就可以了 解决方案三: fragment是需要依赖activity的,fragments

你想找的Python资料这里全都有!没有你找不到!史上最全资料合集

GitHub 上有一个 Awesome - XXX 系列的资源整理,资源非常丰富,涉及面非常广.awesome-python 是 vinta 发起维护的 Python 资源列表,内容包括:Web框架.网络爬虫.网络内容提取.模板引擎.数据库.数据可视化.图片处理.文本处理.自然语言处理.机器学习.日志.代码分析等.在给大家分享之前呢,小编推荐一下一个挺不错的交流宝地,里面都是一群热爱并在学习Python的小伙伴们,大几千了吧,各种各样的人群都有,特别喜欢看到这种大家一起交流解决难题的氛围,群资料

什么时候不应该使用 XML(3)

xml   当问题非常简单时 当它可能产生其大无比的文件时 当应用程序是"一次性的"时 当需要使用 Unix 面向行的文本处理工具时 涉及使用 XML 来进行程序间通信时,有探讨的余地.但当涉及人机通信,如编程语言或配置文件时,XML 可能提供了最不自然的人机界面. 我的论点归结人与计算机硬件之间的一个问题.人类擅长处理隐含的结构,而计算机希望处理明确的结构,它设计成擅长于我们所不擅长的.计算机语言越接近自然语言,它对人类越自然,但实现越困难.在这场拔河竞赛中,稳妥的折衷方案可能是使用

SQL Server 2000数据库连接

server|数据|数据库|数据库连接 SQL Server 2000数据库连接   目录   1         SQL Server 2000服务和实例... 1 2         SQL Server 2000数据库应用结构... 2 3         SQL Server 2000安全管理... 2 3.1       服务组件的启动帐户.... 2 3.2       客户端连接用户管理... 2 4         SQL Server 2000数据库连接常见故障及处理方法...

利用Beowulf让普通PC变集群

现在,Linux在IT业里已经有着非常重要的影响,除了其具有免费.高效.可靠等优势外,对于计算机科学家和那些需要进行大量运算的科学家来说,它还是一个非常强大的工具.自从Donald Becker发起了Beowulf集群计算以后,在NASA的Goddard Space Flight Center工作的Thomas Sterling又扩展了Linux在高性能并行计算领域的应用.今天,大量以普通PC为基础的集群出现在了各个级别的实验室.工业科技中心.大学甚至是一些小的学院中.如果有人问你是否一个有关科