8.2. 消息队列

这里选择使用ZeroMQ的原因主要考虑的是性能问题,其他MQ方案可能会阻塞数据库。

8.2.1. 背景

之前我发表过一篇文章 http://netkiller.github.io/journal/mysql.plugin.fifo.html

该文章中提出了通过fifo 管道,实现数据库与其他进程的通信。属于 IPC 机制(同一个OS/服务器内),后我有采用ZeroMQ重新实现了一个 RPC 机制的方案,同时兼容IPC(跨越OS/服务器)

各种缩写的全称 IPC(IPC :Inter-Process Communication 进程间通信),ITC(ITC : Inter Thread Communication 线程间通信)与RPC(RPC: Remote Procedure Calls远程过程调用)。

支持协议

inproc://my_publisher
tcp://server001:5555
ipc:///tmp/feeds/0
		

8.2.2. 应用场景

如果你想处理数据,由于各种原因你不能在程序中实现,你可以使用这个插件。当数据库中的数据发生变化的时候出发某种操作,你可以使用这个插件。

有时候你的项目可能是外包的,项目结束后外包方不会在管你,你有无法改动现有代码,或者根本不敢改。你可以使用这个插件

采用MQ技术对数据库无任何压力,与采用程序处理并无不同,省却了写代码

处理方法,可以采用同步或者异步方式

例 8.1. 发送短信

发送短信、邮件,只需要查询出相应手机号码,发送到MQ的服务端,服务端接收到手机号码后,放入队列中,多线程程序从队列中领取任务,发送短信。

select zmq_client('tcp://localhost:5555',mobile) from demo where subscribed='Y' ...;
			

传递多个参数,可以使用符号分隔

select zmq_client('tcp://localhost:5555',concat(name,',',mobile,', news')) from demo;
select zmq_client('tcp://localhost:5555',concat(name,'|',mobile,'|news')) from demo;
			

json格式

select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,', template:news}')) from demo;
			

建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息,因为我们不太关心是否能发送成功,本身就是盲目性的发送,手机号码是否可用我们无从得知,短信或者邮件的发送到达率不是100%,所以当进入队列后,让程序自行处理,将成功或者失败信息记录到日志中即可。

例 8.2. 处理图片

首先查询出需要处理图片,然后将路径与分辨率传递给MQ另一端的处理程序

select zmq_client('tcp://localhost:5555',concat(image,',800x600}')) from demo;
			

建议采用异步方式,MQ端接收到任务立即反馈 “成功”信息

例 8.3. 身份证号码校验

select zmq_client('tcp://localhost:5555',id_number) from demo;
			

可以采用同步方案,因为MQ款处理几乎不会延迟,直接将处理结构反馈

例 8.4. 静态化案例

情景模拟,你的项目是你个电商项目,采用外包模式开发,项目已经开发完成。外包放不再负责维护,你现在要做静态化。增加该功能,你要检查多处与商品表相关的造作。

于其改代码,不如程序从外部处理,这样更保险。我们只要写一个程序将动态 URL 下载保存成静态即可,当数据发生变化的时候重新下载覆盖即可

CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_insert` AFTER INSERT ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_update` AFTER UPDATE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
CREATE DEFINER=`dba`@`%` TRIGGER `demo_after_delete` AFTER DELETE ON `demo` FOR EACH ROW BEGIN
	select zmq_client('tcp://localhost:5555', NEW.id);
END
			

MQ 另一端的服务会下载http://www.example.com/goods.php?cid=111&id=100, 然后生成html页面,http://www.example.com/111/100.html

插入会新建页面,更新会覆盖页面,删除会删除页面

这样无论商品的价格,属性改变,静态化程序都会做出相应的处理。

例 8.5. 数据同步案例

我们有多个数据库,A 库里面的数据发生变化后,要同步书库到B库,或者处理结果,或者数据转换后写入其他数据库中

方法也是采用触发器或者EVENT处理

8.2.3. Mysql plugin

我开发了几个 UDF, 共4个 function

UDF

zmq_client(sockt,message)

sockt .成功返回true,失败返回flase.

有了上面的function后你就可以在begin,commit,rollback 直接穿插使用,实现在事物处理期间做你爱做的事。也可以用在触发器与EVENT定时任务中。

8.2.4. plugin 的开发与使用

编译UDF你需要安装下面的软件包

sudo apt-get install pkg-config
sudo apt-get install libmysqlclient-dev

sudo apt-get install gcc gcc-c++ make cmake
		

https://github.com/netkiller/mysql-zmq-plugin

编译udf,最后将so文件复制到 /usr/lib/mysql/plugin/

git clone https://github.com/netkiller/mysql-zmq-plugin.git
cd mysql-zmq-plugin

cmake .
make && make install

装载

create function zmq_client returns string soname 'libzeromq.so';
create function zmq_publish returns string soname 'libzeromq.so';
		

卸载

drop function zmq_client;
drop function zmq_publish;
		

确认安装成功

mysql> SELECT * FROM `mysql`.`func` where name like 'zmq%';
+-------------+-----+--------------+----------+
| name        | ret | dl           | type     |
+-------------+-----+--------------+----------+
| zmq_client  |   0 | libzeromq.so | function |
| zmq_publish |   0 | libzeromq.so | function |
+-------------+-----+--------------+----------+
2 rows in set (0.00 sec)

8.2.5. 插件如何使用

插件有很多种用法,这里仅仅一个例

编译zeromq server 测试程序

cd test
cmake .
make
		

启动服务进程

./server
		

发送Hello world!

mysql> select zmq_client('tcp://localhost:5555','Hello world!');
+---------------------------------------------------+
| zmq_client('tcp://localhost:5555','Hello world!') |
+---------------------------------------------------+
| Hello world! OK                                   |
+---------------------------------------------------+
1 row in set (0.01 sec)

查看服务器端是否接收到信息。

$ ./server
Received: Hello world!
		

我们再将上面的例子使用触发器进一步优化

mysql> select zmq_client('tcp://localhost:5555',mobile) from demo;
+-------------------------------------------+
| zmq_client('tcp://localhost:5555',mobile) |
+-------------------------------------------+
| 13113668891 OK                            |
| 13113668892 OK                            |
| 13113668893 OK                            |
| 13322993040 OK                            |
| 13588997745 OK                            |
+-------------------------------------------+
5 rows in set (0.03 sec)

服务器端已经接收到数据库发过来的信息

$ ./server
Received: Hello world!
Received: 13113668891
Received: 13113668892
Received: 13113668893
Received: 13322993040
Received: 13588997745
		

我们可以拼装json或者序列化数据,发送给远端

mysql> select zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) from demo;
+------------------------------------------------------------------------------+
| zmq_client('tcp://localhost:5555',concat('{name:',name,', tel:',mobile,'}')) |
+------------------------------------------------------------------------------+
| {name:neo, tel:13113668891} OK                                               |
| {name:jam, tel:13113668892} OK                                               |
| {name:leo, tel:13113668893} OK                                               |
| {name:jerry, tel:13322993040} OK                                             |
| {name:tom, tel:13588997745} OK                                               |
+------------------------------------------------------------------------------+
5 rows in set (0.03 sec)

返回数据取决于你服务端怎么编写处理程序,你可以返回true/false等等。

触发器以及事务处理,这里就不演示了

原文出处:Netkiller 系列 手札
本文作者:陈景峯
转载请与作者联系,同时请务必标明文章原始出处和作者信息及本声明。

时间: 2024-09-24 15:50:36

8.2. 消息队列的相关文章

wcf-使用MSMQ消息队列的WCF的效率的问题。

问题描述 使用MSMQ消息队列的WCF的效率的问题. 请教个问题,WCF在使用MSMQ的时候,每次WCF程序处理MSMQ中的数据量,每秒只有几百条. 以下是测试数据 处理消息数: 180000 开始时间: [2013-11-01 15:35:27] Start To Save Log To DB. 结束时间: [2013-11-01 15:51:12] Finish To Save Log To DB. 耗时: 00:15:45 基本上算下来也就每秒190多条,以前好的时候可以达到200多条,其

消息队列入门(二)消息队列的规范和开源实现

1.AMQP规范 AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口. 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,然后填上内容.路由KEY,最后发

System V 消息队列

1.概述 消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.System V 消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除.可以将内核中的某个特定的消息队列画为一个消息链表,如下图所示: 对于系统中没个消息队列,内核维护一个msqid

消息队列和管道的区别(转载)

转载自:http://bbs.chinaunix.net/viewthread.php?tid=265266 作者:beginner-bj 请问管道和消息队列有什么不同  管道通信(PIPE) 管道通信方式的中间介质是文件,通常称这种文件为管道文件.两个进程利用管道文件进行通信时,一个 进程为写进程,另一个进程为读进程.写进程通过写端(发送端)往管道文件中写入信息:读进程通过读 端(接收端)从管道文件中读取信息.两个进程协调不断地进行写.读,便会构成双方通过管道传递信息 的流水线. 利用系统调用

使用 PHP 消息队列实现 Android 与 Web 通信

需求描述很简单:Android 发送数据到 Web 网页上. 系统: Ubuntu 14.04 + apache2 + php5 + Android 4.4 思路是 socket + 消息队列 + 服务器发送事件,下面的讲解步骤为 Android 端,服务器端,前端.重点是在于 PHP 进程间通信. Android 端比较直接,就是一个 socket 程序.需要注意的是,如果直接在活动主线程里面创建 socket 会报一个 android.os.NetworkOnMainThreadExcept

PHP memcache实现消息队列实例

现在memcache在服务器缓存应用比较广泛,下面我来介绍memcache实现消息队列等待的一个例子,有需要了解的朋友可参考. memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志.然后通过定时程序将内容落地到文件或者数据库. php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列. 方便实现队列的轻量级队列服务器是: starling支持memcache协议的轻量级持久化服务器 https://github.co

消息队列在VB.NET数据库开发中的应用

数据|数据库 我们先简单的了解一下什么是消息队列(MSMQ)?消息队列是 Windows 2000(NT也有MSMQ,WIN95/98/me/xp不含消息队列服务但是支持客户端的运行)操作系统中通讯的基础,也是用于创建分布式.松散连接通讯应用程序的工具.这些应用程序可以通过不同种类的网络进行通讯,也可以与脱机的计算机通讯.消息队列分为用户创建队列和系统队列,用户队列分为: · "公共队列"在整个可传递消息的"消息队列"网络中复制并传输,并且有可能由网络连接的所有站点

C#:消息队列应用程序

程序 摘要:本文概述一种用于处理若干消息队列的 Windows 服务解决方案,重点介绍 .NET 框架和 C# 应用程序. 下载 CSharpMessageService.exe 示例文件 (86 KB) 目录简介.NET 框架应用程序 应用程序结构 服务类 检测设备 安装总结参考资料 简介Microsoft 近期推出了一种用于生成集成应用程序的新平台--Microsoft .NET 框架..NET 框架允许开发人员使用任何编程语言迅速生成和部署 Web 服务和应用程序.Microsoft In

Linux下C编程:消息队列实例

消息队列是一系列连续排列的消息,保存在内核中,通过消息队列的引用标识符来访问.消息队列与管道很相似,但使用消息队列的好处是对每个消息指定了特定消息类型,接收消息的进程可以请求接收下一条消息,也可以请求接收下一条特定类型的消息. #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdio.h> #include <string.h> int mai

消息队列工具类(MSMQ)

所要做的是简化msmq的调用代码以及做到可替代性,实现后,调用消息队列代 码变为如下所示: QueueService srv = QueueService.Instance(); //检查存储DTO1的队列是否存在,如不存在则自动建立 srv.Prepare<DTO1>(); //发送类型为DTO1的消息 srv.Send<DTO1>(new DTO1() { p1="1", p2="2" }); //发送类型为DTO1的消息,并且将发送的消