linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能

本文转载自链接:

http://blog.csdn.net/chenzba/article/details/51224715


最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。

       废话不多说,先贴代码。

redis_publisher.h

/*************************************************************************
    > File Name: redis_publisher.h
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST
    > Description: 封装hiredis,实现消息发布给redis功能
 ************************************************************************/

#ifndef REDIS_PUBLISHER_H
#define REDIS_PUBLISHER_H

#include <stdlib.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <string>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <boost/tr1/functional.hpp>

class CRedisPublisher
{
public:
    CRedisPublisher();
    ~CRedisPublisher();

    bool init();
    bool uninit();
    bool connect();
    bool disconnect();

    bool publish(const std::string &channel_name,
        const std::string &message);

private:
     // 下面三个回调函数供redis服务调用
    // 连接回调
    static void connect_callback(const redisAsyncContext *redis_context,
        int status);

	// 断开连接的回调
    static void disconnect_callback(const redisAsyncContext *redis_context,
        int status);

	// 执行命令回调
    static void command_callback(redisAsyncContext *redis_context,
        void *reply, void *privdata);

    // 事件分发线程函数
    static void *event_thread(void *data);
    void *event_proc();

private:
     // libevent事件对象
    event_base *_event_base;
	// 事件线程ID
    pthread_t _event_thread;
	// 事件线程的信号量
    sem_t _event_sem;
	// hiredis异步对象
    redisAsyncContext *_redis_context;
};

#endif

redis_publisher.cpp

/*************************************************************************
    > File Name: redis_publisher.cpp
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST
    > Description:
 ************************************************************************/

#include <stddef.h>
#include <assert.h>
#include <string.h>
#include "redis_publisher.h"

CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),
_redis_context(0)
{
}

CRedisPublisher::~CRedisPublisher()
{
}

bool CRedisPublisher::init()
{
    // initialize the event
    _event_base = event_base_new();    // 创建libevent对象
    if (NULL == _event_base)
    {
        printf(": Create redis event failed.\n");
        return false;
    }

    memset(&_event_sem, 0, sizeof(_event_sem));
    int ret = sem_init(&_event_sem, 0, 0);
    if (ret != 0)
    {
        printf(": Init sem failed.\n");
        return false;
    }

    return true;
}

bool CRedisPublisher::uninit()
{
    _event_base = NULL;

    sem_destroy(&_event_sem);
    return true;
}

bool CRedisPublisher::connect()
{
    // connect redis
    _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口
    if (NULL == _redis_context)
    {
        printf(": Connect redis failed.\n");
        return false;
    }

    if (_redis_context->err)
    {
        printf(": Connect redis error: %d, %s\n",
            _redis_context->err, _redis_context->errstr);    // 输出错误信息
        return false;
    }

    // attach the event
    redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联

    // 创建事件处理线程
    int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this);
    if (ret != 0)
    {
        printf(": create event thread failed.\n");
        disconnect();
        return false;
    }

	// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
    redisAsyncSetConnectCallback(_redis_context,
        &CRedisPublisher::connect_callback);

	// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
    redisAsyncSetDisconnectCallback(_redis_context,
        &CRedisPublisher::disconnect_callback);

	// 启动事件线程
    sem_post(&_event_sem);
    return true;
}

bool CRedisPublisher::disconnect()
{
    if (_redis_context)
    {
        redisAsyncDisconnect(_redis_context);
        redisAsyncFree(_redis_context);
        _redis_context = NULL;
    }

    return true;
}

bool CRedisPublisher::publish(const std::string &channel_name,
    const std::string &message)
{
    int ret = redisAsyncCommand(_redis_context,
        &CRedisPublisher::command_callback, this, "PUBLISH %s %s",
        channel_name.c_str(), message.c_str());
    if (REDIS_ERR == ret)
    {
        printf("Publish command failed: %d\n", ret);
        return false;
    }

    return true;
}

void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context,
    int status)
{
    if (status != REDIS_OK)
    {
        printf(": Error: %s\n", redis_context->errstr);
    }
    else
    {
        printf(": Redis connected!\n");
    }
}

void CRedisPublisher::disconnect_callback(
    const redisAsyncContext *redis_context, int status)
{
    if (status != REDIS_OK)
    {
		// 这里异常退出,可以尝试重连
        printf(": Error: %s\n", redis_context->errstr);
    }
}

// 消息接收回调函数
void CRedisPublisher::command_callback(redisAsyncContext *redis_context,
    void *reply, void *privdata)
{
    printf("command callback.\n");
	// 这里不执行任何操作
}

void *CRedisPublisher::event_thread(void *data)
{
    if (NULL == data)
    {
        printf(": Error!\n");
        assert(false);
        return NULL;
    }

    CRedisPublisher *self_this = reinterpret_cast<CRedisPublisher *>(data);
    return self_this->event_proc();
}

void *CRedisPublisher::event_proc()
{
    sem_wait(&_event_sem);

	// 开启事件分发,event_base_dispatch会阻塞
    event_base_dispatch(_event_base);

    return NULL;
}

redis_subscriber.h

/*************************************************************************
    > File Name: redis_subscriber.h
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST
    > Description: 封装hiredis,实现消息订阅redis功能
 ************************************************************************/

#ifndef REDIS_SUBSCRIBER_H
#define REDIS_SUBSCRIBER_H

#include <stdlib.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <string>
#include <vector>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <boost/tr1/functional.hpp>

class CRedisSubscriber
{
public:
    typedef std::tr1::function<void (const char *, const char *, int)>         NotifyMessageFn;	// 回调函数对象类型,当接收到消息后调用回调把消息发送出去

    CRedisSubscriber();
    ~CRedisSubscriber();

	bool init(const NotifyMessageFn &fn);	// 传入回调对象
    bool uninit();
    bool connect();
    bool disconnect();

    // 可以多次调用,订阅多个频道
    bool subscribe(const std::string &channel_name);

private:
    // 下面三个回调函数供redis服务调用
    // 连接回调
    static void connect_callback(const redisAsyncContext *redis_context,
        int status);

	// 断开连接的回调
    static void disconnect_callback(const redisAsyncContext *redis_context,
        int status);

	// 执行命令回调
    static void command_callback(redisAsyncContext *redis_context,
        void *reply, void *privdata);

    // 事件分发线程函数
    static void *event_thread(void *data);
    void *event_proc();

private:
    // libevent事件对象
    event_base *_event_base;
	// 事件线程ID
    pthread_t _event_thread;
	// 事件线程的信号量
    sem_t _event_sem;
	// hiredis异步对象
    redisAsyncContext *_redis_context;

	// 通知外层的回调函数对象
    NotifyMessageFn _notify_message_fn;
};

#endif

redis_subscriber.cpp:

/*************************************************************************
    > File Name: redis_subscriber.cpp
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 10:15:09 PM CST
    > Description:
 ************************************************************************/

#include <stddef.h>
#include <assert.h>
#include <string.h>
#include "redis_subscriber.h"

CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),
_redis_context(0)
{
}

CRedisSubscriber::~CRedisSubscriber()
{
}

bool CRedisSubscriber::init(const NotifyMessageFn &fn)
{
    // initialize the event
    _notify_message_fn = fn;
    _event_base = event_base_new();    // 创建libevent对象
    if (NULL == _event_base)
    {
        printf(": Create redis event failed.\n");
        return false;
    }

    memset(&_event_sem, 0, sizeof(_event_sem));
    int ret = sem_init(&_event_sem, 0, 0);
    if (ret != 0)
    {
        printf(": Init sem failed.\n");
        return false;
    }

    return true;
}

bool CRedisSubscriber::uninit()
{
    _event_base = NULL;

    sem_destroy(&_event_sem);
    return true;
}

bool CRedisSubscriber::connect()
{
    // connect redis
    _redis_context = redisAsyncConnect("127.0.0.1", 6379);    // 异步连接到redis服务器上,使用默认端口
    if (NULL == _redis_context)
    {
        printf(": Connect redis failed.\n");
        return false;
    }

    if (_redis_context->err)
    {
        printf(": Connect redis error: %d, %s\n",
            _redis_context->err, _redis_context->errstr);    // 输出错误信息
        return false;
    }

    // attach the event
    redisLibeventAttach(_redis_context, _event_base);    // 将事件绑定到redis context上,使设置给redis的回调跟事件关联

    // 创建事件处理线程
    int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this);
    if (ret != 0)
    {
        printf(": create event thread failed.\n");
        disconnect();
        return false;
    }

	// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
    redisAsyncSetConnectCallback(_redis_context,
        &CRedisSubscriber::connect_callback);

	// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
    redisAsyncSetDisconnectCallback(_redis_context,
        &CRedisSubscriber::disconnect_callback);

	// 启动事件线程
    sem_post(&_event_sem);
    return true;
}

bool CRedisSubscriber::disconnect()
{
    if (_redis_context)
    {
        redisAsyncDisconnect(_redis_context);
        redisAsyncFree(_redis_context);
        _redis_context = NULL;
    }

    return true;
}

bool CRedisSubscriber::subscribe(const std::string &channel_name)
{
    int ret = redisAsyncCommand(_redis_context,
        &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s",
        channel_name.c_str());
    if (REDIS_ERR == ret)
    {
        printf("Subscribe command failed: %d\n", ret);
        return false;
    }

    printf(": Subscribe success: %s\n", channel_name.c_str());
    return true;
}

void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context,
    int status)
{
    if (status != REDIS_OK)
    {
        printf(": Error: %s\n", redis_context->errstr);
    }
    else
    {
        printf(": Redis connected!");
    }
}

void CRedisSubscriber::disconnect_callback(
    const redisAsyncContext *redis_context, int status)
{
    if (status != REDIS_OK)
    {
		// 这里异常退出,可以尝试重连
        printf(": Error: %s\n", redis_context->errstr);
    }
}

// 消息接收回调函数
void CRedisSubscriber::command_callback(redisAsyncContext *redis_context,
    void *reply, void *privdata)
{
    if (NULL == reply || NULL == privdata) {
        return ;
    }

	// 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问
    CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(privdata);
    redisReply *redis_reply = reinterpret_cast<redisReply *>(reply);

	// 订阅接收到的消息是一个带三元素的数组
    if (redis_reply->type == REDIS_REPLY_ARRAY &&
    redis_reply->elements == 3)
    {
        printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",
        redis_reply->element[0]->str, redis_reply->element[0]->len,
        redis_reply->element[1]->str, redis_reply->element[1]->len,
        redis_reply->element[2]->str, redis_reply->element[2]->len);

		// 调用函数对象把消息通知给外层
        self_this->_notify_message_fn(redis_reply->element[1]->str,
            redis_reply->element[2]->str, redis_reply->element[2]->len);
    }
}

void *CRedisSubscriber::event_thread(void *data)
{
    if (NULL == data)
    {
        printf(": Error!\n");
        assert(false);
        return NULL;
    }

    CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(data);
    return self_this->event_proc();
}

void *CRedisSubscriber::event_proc()
{
    sem_wait(&_event_sem);

	// 开启事件分发,event_base_dispatch会阻塞
    event_base_dispatch(_event_base);

    return NULL;
}

问题1:hiredis官网没有异步接口的实现例子。

        hiredis提供了几个异步通信的API,一开始根据API名字的理解,我们实现了跟redis服务器建立连接、订阅和发布的功能,可在实际使用的时候,程序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。

        网上查了很多资料,原来hiredis的异步实现是通过事件来分发redis发送过来的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一个实现事件的分发,网上的资料提示使用libae、libev和libuv可能发生其他问题,这里为了方便就选用libevent。hireds官网并没有对libevent做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。

        关于libevent的使用这里就不再赘述,详情可以见libevent官网。

libevent官网:http://libevent.org/

libevent api文档:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae

CRedisPublisher和CRedisSubscriber的初始化过程:

初始化事件处理,并获得事件处理的实例:

_event_base = event_base_new();

在获得redisAsyncContext *之后,调用

redisLibeventAttach(_redis_context, _event_base);

这样就将事件处理和redis关联起来,最后在另一个线程调用

event_base_dispatch(_event_base);

启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发,值得注意的是,这里用信号灯_event_sem控制线程的启动,意在程序调用

    redisAsyncSetConnectCallback(_redis_context,
        &CRedisSubscriber::connect_callback);
    redisAsyncSetDisconnectCallback(_redis_context,
        &CRedisSubscriber::disconnect_callback);

之后,能够完全捕捉到这两个回调。

问题2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’错误

        有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher和CRedisSubscriber很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够发布消息也能够订阅消息。其实一开始我就是这么干的,在使用的时候发现,用同个redisAsynContex *对象进行消息订阅和发布,与redis服务连接会自动断开,disconnect_callback回调会被调用,并且返回奇怪的错误:ERR
only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同个redisAsyncContext *对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。

        当然,你也可以将相同的逻辑抽象到一个基类里,并实现publish和subscribe接口。

问题3 相关依赖的库

        编译之前,需要安装hiredis、libevent和boost库,我是用的是Ubuntu x64系统。

hiredis官网:https://github.com/redis/hiredis

下载源码解压,进入解压目录,执行make && make install命令。

libevent官网:http://libevent.org/下载最新的稳定版

解压后进入解压目录,执行命令

./configure -prefix=/usr

sudo make && make install

boost库:直接执行安装:sudo apt-get install libboost-dev

如果你不是用std::tr1::function的函数对象来给外层通知消息,就不需要boost库。你可以用接口的形式实现回调,把接口传给CRedisSubscribe类,让它在接收到消息后调用接口回调,通知外层。

问题4 如何使用

        最后贴出例子代码。

publisher.cpp,实现发布消息:

/*************************************************************************
    > File Name: publisher.cpp
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 12:13:24 PM CST
 ************************************************************************/

#include "redis_publisher.h"

int main(int argc, char *argv[])
{
    CRedisPublisher publisher;

    bool ret = publisher.init();
    if (!ret)
    {
        printf("Init failed.\n");
        return 0;
    }

    ret = publisher.connect();
    if (!ret)
    {
        printf("connect failed.");
        return 0;
    }

    while (true)
    {
        publisher.publish("test-channel", "Test message");
        sleep(1);
    }

    publisher.disconnect();
    publisher.uninit();
    return 0;
}

subscriber.cpp实现订阅消息:

/*************************************************************************
    > File Name: subscriber.cpp
    > Author: chenzengba
    > Mail: chenzengba@gmail.com
    > Created Time: Sat 23 Apr 2016 12:26:42 PM CST
 ************************************************************************/

#include "redis_subscriber.h"

void recieve_message(const char *channel_name,
    const char *message, int len)
{
    printf("Recieve message:\n    channel name: %s\n    message: %s\n",
        channel_name, message);
}

int main(int argc, char *argv[])
{
    CRedisSubscriber subscriber;
    CRedisSubscriber::NotifyMessageFn fn =
        bind(recieve_message, std::tr1::placeholders::_1,
        std::tr1::placeholders::_2, std::tr1::placeholders::_3);

    bool ret = subscriber.init(fn);
    if (!ret)
    {
        printf("Init failed.\n");
        return 0;
    }

    ret = subscriber.connect();
    if (!ret)
    {
        printf("Connect failed.\n");
        return 0;
    }

    subscriber.subscribe("test-channel");

    while (true)
    {
        sleep(1);
    }

    subscriber.disconnect();
    subscriber.uninit();

    return 0;
}

关于编译的问题:在g++中编译,注意要加上-lhiredis -levent参数,下面是一个简单的Makefile:

EXE=server_main client_main
CC=g++
FLAG=-lhiredis -levent
OBJ=redis_publisher.o publisher.o redis_subscriber.o subscriber.o

all:$(EXE)

$(EXE):$(OBJ)
	$(CC) -o publisher redis_publisher.o publisher.o $(FLAG)
	$(CC) -o subscriber redis_subscriber.o subscriber.o $(FLAG)

redis_publisher.o:redis_publisher.h
redis_subscriber.o:redis_subscriber.h

publisher.o:publisher.cpp
	$(CC) -c publisher.cpp

subscriber.o:subscriber.cpp
	$(CC) -c subscriber.cpp
clean:
	rm publisher subscriber *.o

致谢:

redis异步API使用libevent:http://www.tuicool.com/articles/N73uuu



时间: 2024-10-26 06:09:22

linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能的相关文章

Linux下用C语言API连接MySQL数据库

像PHP和perl一样,MySQL也提供的C语言使用的API. C代码的API是随MySQL一起发布的. 它包含在mysqlclient库中, 可以使C程序来访问数据库. MySQL源码包中的许多客户端都是用C写的. 如果你正在找使用这些C API的例子, 可以看看客户端的写法.你可以在MySQL源码包的clients目录找到这些例子. 软件包 请确保你已经安装了必要的开发环境,比如gcc, mysql等等. 下面是编译一个程序所需要安装的软件包的列表 (Ubuntu为例): mysql-cli

Linux下安装oracle客户端并配置php5.3_php技巧

因项目需要在linux下进行php5.3的oracle客户端编译,简要介绍一下步骤及走过的弯路. 1.下载Oracle客户端程序包,其中包含OCI.OCCI和JDBC-OCI等相关文件. 1.1下载文件地址 http://www.oracle.com/technetwork/database/features/instant-client/index-097480.html 根据操作系统的版本选择对应的软件,我需要的是X86_64选择 Instant Client for Linux x86-6

Linux下设置定期执行脚本

  在Linux下,经常需要定期的执行一些脚本从而来实现一些功能. 在Linux下我们用crontab来实现定期的执行脚本这个功能,下面就介绍一下crontab的使用.以及我遇到的一些问题 一. crontab的使用说明 1. crond 是linux用来定期执行程序的命令.当安装完成操作系统之后,默认便会启动此任务调度命令.crond命令每分钟会定期检查是否有要执行的工作,如果有要执行的工作便会自动执行该工作.而linux任务调度的工作主要分为以下两类 a. 系统执行的工作,比如垃圾清理,备份

PDF edit一个Linux下PDF编辑软件

自由的通用PDF编辑器PDF edit是一个Linux下PDF编辑软件,可以选中局部,进行编辑,这个功能比较不错. 脚本编写扩展了编辑器应用, 也可以为此编辑器开发插件 注意: 需要QT3.x. 此软件无法在QT4.x下编译. 不过大部分发行版都带有QT3和QT4. 此编辑器在GNU GPL协议下发布 安装 sudo apt-get install pdfedit 注:有时安装完成后可能在"应用程序"里面看不到,重启X就可以了(或者终端输入pdfedit).

linux下,java调用dll文件时,dll应该放在哪

问题描述 windows下,java调用dll文件时,dll放在system32这个文件夹下面,那linux下,dll文件应该放在哪? 问题补充:radio123 写道 解决方案 楼主要伤心了 linux底下应该不能直接调用dll 二进制格式不一样解决方案二:你们有谁做过把网页生成图片的功能吗,我提问很久了,都没人搭理我,我在windows下行,在linux下就不行了解决方案三:其实我也是啊,一个功能好不容易实现了,但是linux下不能运行,郁闷死了解决方案四:这个还真没有,还是借助网上力量搜索

Linux下套接字详解(三)----几种套接字I/O模型

背景知识 阻塞和非阻塞 对于一个套接字的 I/O通信,它会涉及到两个系统对象,一个是调用这个IO的进程或者线程,另一个就是系统内核.比如当一个读操作发生时,它会经历两个阶段: ①等待数据准备 (Waiting for the data to be ready) ②将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 阻塞,在linux中,默认情况下所有的socket都是blocking,当用户进程调用了recvfrom/re

Linux下多进程/多线程编程

linux下多进程.多线程编程 linux下进程   (一) 理解Linux下进程的结构  Linux下一个进程在内存里有三部份的数据,就是"数据段","堆栈段"和"代码段",其实学过汇编语言的人一定知道,一般的CPU象I386,都有上述三种段寄存器,以方便操作系统的运行."代码段",顾名思义,就是存放了程序代码的数据,假如机器中有数个进程运行相同的一个程序,那么它们就可以使用同一个代码段.  堆栈段存放的就是子程序的返回地址

一、LINUX下blueZ的编程

  Linux下Bluez的编程实现 1. 蓝牙的各个协议栈的简介....................................................................................... 2 1.1.蓝牙技术.......................................................................................................... 2 1.1.蓝牙协议栈.

Linux 下的五种 IO 模型详细介绍_Linux

概念说明 用户空间与内核空间 现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方).操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限.为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间.针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空