进程间通信的另类实现

背景

Node.js 内置的进程间通信使用非常简单,但也有一定的局限性,只能在父子进程间通信。下面是官方文档给的一个例子。

首先是父进程的 parent.js :

const cp = require('child_process');const n = cp.fork(`${__dirname}/sub.js`);

n.on('message', (m) => {  console.log('PARENT got message:', m);});

n.send({ hello: 'world' });

接着再看看子进程 sub.js 的实现:

process.on('message', (m) => {  console.log('CHILD got message:', m);});

process.send({ foo: 'bar' });

如果两个进程间没有父子这种亲缘关系又如何通信呢,本文就为大家讲解 Midway 5.0 中如何使用更灵活的 socket 实现任意进程间的通信。

协议设计

既然是通信,那么通信协议的设计是必不可少的。就像以前经常在电影里看到的两个人通话时,都会加上一句 over 来告知对方自己要说的已经说完了。父子进程间的通信协议也采用了这种最简单最高效的方式,双方在发送消息时都会在消息末尾加上一个回车符 \n,表示本次发送的消息就这么多,也就是对方就收消息时遇到 \n 表明本次消息接收完毕。

消息接收后需要对其进行解码,或者说是反序列化,最终便于识别和使用。父子进程间通信就采用了 JSON.encode 和 JSON.decode 来实现消息的编码和解码。

父子进程间采用的这种通信协议非常的简单,但是也非常高效,能够满足大部分使用场景。像 HSF 这类 RPC 调用通信协议就比较复杂了,我们平时遇到最多的就是 HTTP 协议,做 web 开发的同学肯定都比较清楚协议的规则了。

本次实现的利用 socket 实现进程间通信也采用这种最简单的方式。

实现

实现协议之前回想一下整个通信的流程,首先双方建立一条全双工的通信信道,待 2 边都 ready 后消息便可以发送消息了,2 边既是消息的接收方也是消息的发送方。我们平时会将一方称为 server,另一方称为 client,这是在功能上的划分,一般 server 会有多个client 同时连接。

协议解析

在双方开始通信之前,我们先来实现协议的解析 parse.js,非常的简单。

'use strict';const StringDecoder = require('string_decoder').StringDecoder;const EventEmitter = require('events');

class Parser extends EventEmitter {    constructor() {        super();        this.decoder = new StringDecoder('utf8');        this.jsonBuffer = '';    }

encode(message) {        return JSON.stringify(message) + '\n';    }

feed(buf) {        let jsonBuffer = this.jsonBuffer;        jsonBuffer += this.decoder.write(buf);        let i, start = 0;        while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {            const json = jsonBuffer.slice(start, i);            const message = JSON.parse(json);            this.emit('message', message);            start = i + 1;        }        this.jsonBuffer = jsonBuffer.slice(start);    }}

module.exports = Parser;

socket 通信

我们平时用到的 socket 大都是 TCP 类型的,因为要涉及到两个远程进程间的通信。如果只是实现本地进程间通信,可以选择更高效的文件 socket,同时也避免额外占用一个端口的情况。

Client

在使用上 TCP socket 和 file socket 差别不大,前者监听某个端口,后者监听某个临时文件,需要注意的是监听文件的路径在 Windows 和 Unix 上有些不一样。

On Windows, the local domain is implemented using a named pipe. The path must refer to an entry in \?\pipe\ or \.\pipe.

开始之前,大致列出客户端需要有哪些功能

  • 连接到服务器
  • 监听服务器发送的数据,按照协议规则解析出消息实体
  • 提供向服务器发送消息的接口

client.js

'use strict';const path = require('path');const net = require('net');const Parser = require('./parser');const EventEmitter = require('events');const os = require('os');const tmpDir = os.tmpDir();let sockPath = path.join(tmpDir, 'midway.sock');

if (process.platform === 'win32') {    sockPath = sockPath.replace(/^\//, '');    sockPath = sockPath.replace(/\//g, '-');    sockPath = '\\\\.\\pipe\\' + sockPath;}

class Client extends EventEmitter{    constructor(options) {        options = options || {};        super();        if (options.socket) {            this.socket = options.socket;        } else {            this.socket = net.connect(sockPath);        }        this.bind();    }

bind() {        const parser = new Parser();        const socket = this.socket;        socket.on('data', (buf) => {            parser.feed(buf);        });

parser.on('message', (message) => {            this.emit('message', message);        });        this.parser = parser;    }

send(message) {        this.socket.write(this.parser.encode(message));    }}

module.exports = Client;

Server

实现之前先梳理下 server 端要有哪些基础的功能,

  • 创建一个 net server
  • 监听某个文件
  • 接收新连接上的客户端
  • 根据接收到的数据按协议规则解析出消息实体,处理客户端请求

下面就是一个简单的 server 实现

'use strict';const path = require('path');const fs = require('fs');const net = require('net');const Client = require('./client');const EventEmitter = require('events');const os = require('os');const tmpDir = os.tmpDir();let sockPath = path.join(tmpDir, 'midway.sock');

if (process.platform === 'win32') {    sockPath = sockPath.replace(/^\//, '');    sockPath = sockPath.replace(/\//g, '-');    sockPath = '\\\\.\\pipe\\' + sockPath;}

class Server extends EventEmitter{    constructor() {        super();        this.server = net.createServer((socket)=> this.handleConnection(socket));    }

listen(callback) {        if (fs.existsSync(sockPath)) {            fs.unlinkSync(sockPath);        }        this.server.listen(sockPath, callback);    }

handleConnection(socket) {        const client = new Client({            socket: socket        });        client.on('message', (message) => {            this.handleRequest(message, client);        });        this.emit('connect', client);    }

handleRequest(message, client) {        this.emit('message', message, client);    }}

module.exports = Server;

demo

至此,我们已经实现了类似父子进程间通信的功能了。还记得上篇《多进程下的测试覆盖率》中使用到的那个简单的 RPC demo,现在我们使用上面提到的 socket 通信方式重新实现一个。

client.js

'use strict';const Client = require('../lib/Client');

let rid = 0;const service = {};const queue = [];const requestQueue = new Map();

function start(ready) {    const client = new Client();

function send() {        rid++;        let args = [].slice.call(arguments);        const method = args.slice(0,1)[0];        const callback = args.slice(-1)[0];

const req = {            rid: rid,            method:method,            args:args.slice(1,-1)        };

requestQueue.set(rid,Object.assign({            callback: callback        }, req));

client.send(req);    }

client.on('message', function(message){        if (message.action === 'register') {            message.methods.forEach((method) => {                service[method] = send.bind(null, method);            });            ready(service);        } else {            const req = requestQueue.get(message.rid);            const callback = req.callback;            if (message.success) {                callback(null, message.data);            } else {                callback(new Error(message.error));            }            requestQueue.delete(message.rid);        }    });}

start((service)=> {    service.add(1,2,3,4,5, function(err, result) {        console.log(`1+2+3+4+5 = ${result}`);    });

service.time(1,2,3,4,5, function(err, result) {        console.log(`1*2*3*4*5 = ${result}`);    });});

server.js

'use strict';const Server = require('../lib/server');const server = new Server();server.listen();

const service = {    add() {        const args = [].slice.call(arguments);        return args.slice().reduce(function(a,b) {            return a+b;        });    },

time() {        const args = [].slice.call(arguments);        return new Promise((resolve, reject)=> {            setTimeout( ()=> {                const ret = args.slice().reduce(function(a,b) {                    return a*b;                });                resolve(ret);            }, 1000);        });    }}

server.on('connect', (client) => {    client.send({        action:'register',        methods: Object.keys(service)    });});

server.on('message', function(message, client) {    let ret = { success: false, rid: message.rid };    const method = message.method;    if (service[method]) {        try {            const result = service[method].apply(service, message.args);            ret.success = true;            if(result.then) {                return result.then((data)=> {                    ret.data = data;                    client.send(ret);                }).catch((err)=>{                    ret.success = false;                    ret.error = err.message;                    client.send(err);                })            }            ret.data = result;        } catch (err) {            ret.error = err.message;        }    }    client.send(ret);});

先启动 server,然后运行 client,控制台输出

1+2+3+4+5 = 151*2*3*4*5 = 120

小结

不论是本文讲解的简单进程间通信,还是更复杂的 RPC 调用,整个的设计实现流程相差不大。生产环境中还需要处理各种异常,网络连接错误,协议解析错误等等,有兴趣的同学可以继续在本 demo上继续完善~

该文章来自:http://taobaofed.org/blog/2016/01/26/nodejs-ipc/

作者:淘杰

时间: 2024-10-04 16:21:42

进程间通信的另类实现的相关文章

写了一个简单的NodeJS实现的进程间通信的例子

1. cluster介绍 大家都知道nodejs是一个单进程单线程的服务器引擎,不管有多么的强大硬件,只能利用到单个CPU进行计算.所以,有人开发了第三方的cluster,让node可以利用多核CPU实现并行.随着nodejs的发展,让nodejs上生产环境,就必须是支持多进程多核处理!在V0.6.0版本,Nodejs内置了cluster的特性.自此,Nodejs终于可以作为一个独立的应用开发解决方案,映入大家眼帘了. cluster是一个nodejs内置的模块,用于nodejs多核处理.clu

PhotoShop打造山间的女孩另类幻想场景合成教程

教程教我们用PhotoShop打造一幅趴在山间的女孩另类幻想场景效果,这里我给大家制作一个幻想场景,有美女.有山.有空中河.还有星球. 用不同的调整将一副玄幻的画面展示给你. photoshop教程效果图: 1.创建一个1400*800像素的新文件,背景默认.将海滩图片拽入画布调整大小如下图所示. 在这里我们之需要沙滩的部分,所以给图层添加"蒙板"用画笔将天空部分隐藏 这时得到如下效果 点击图层面板下方的"创建新的图层或调整图层"来应用以下三个选项调整图层 黑白 将

数据库进程间通信解决方案

数据库进程间通信解决方案 数据库与其他第三方应用程序进程间通信解决方案 Mr. Neo Chen (netkiller), 陈景峰(BG7NYT) 中国广东省深圳市龙华新区民治街道溪山美地518131+86 13113668890+86 755 29812080<netkiller@msn.com> $Id: MySQL-plugin.xml 587 2013-12-16 14:00:00Z netkiller $ 版权 2011, 2012, 2013 http://netkiller.gi

网上出售剩余时间女话务员另类兼职

春节前夕,大家都在忙着过年的事,南京有个打工妹却在经营着自己的"剩余时间",只要你有钱并且有需要,你就可以买下她的时间,让她为你服务.据了解,这种另类的兼职方式,正在这个网络时代崭露头角. 发现:女孩网上发帖卖剩余时间 "小女子,江苏扬州人士,22岁.现在在南京的一家通讯公司上班,是一名小小的话务员.由于我工作的特殊性,我在每天14点之前是闲暇的.现在我想出售我剩余的人生,由你来安排我的生活.我的剩余人生为真正需要的人服务.让生活变得更加充实,安排我的时间是你们的权利,为你们

Marquee标记的另类用法

运行下面的代码: <marquee direction="down" width="400" height="80" bgcolor="#f6f6f6"> <marquee direction="right" width="300" height="80%">网页教学网欢迎您的光临! </marquee></marquee&g

网站推广的另类方法解密

解密|推广|网站推广 说到网站推广,大家的方法也是千奇百怪,各有各的招,不管方式如何,最终只要能把网站流量实实在在的提高一个档次,那就是好方法.对于一些常规的网站推广方法,大家也都见多不怪,这里就不赘述.我在这里谈几个比较我认为比较另类的方法 (1)"明星"效益法.忠实的fans是会关注明星的一切活动的.倘若站长也能够在自己的活动群,甚至整个网络都有一定的好的口碑的话,其号召力是不可想像的.对于个人站长而言要想成为"明星"就要有更多的关于自己正面报道的的文章或是自己

在.NET中使用命名管道完成进程间通信

进程 你曾经需要在同一台机器的两个.NET应用程序间进行数据交换吗?例如,一个Web站点和一个Windows服务?.NET框架提供了几种好的选择来完成进程间通信(IPC):Web Service,Remoting.最快的是Remoting,因为它使用TCP通道和二进制格式. 然而,如果需要频繁地从一个应用程序调用另外一个应用程序,并且你主要关心的是性能,Remoting还是显得慢了一点.让Remoting变慢的,不是协议,而是序列化. 通常来说,Remoting是很不错的,但如果仅限于本地机器的

连接数据库时发生&amp;amp;quot;一般性网络错误&amp;amp;quot;的另类解释

错误|连接数据库|网络   连接数据库时发生   "一般性网络错误"   的另类解释  Revision History:Version Date Creator Description 1.0.0.1 2003-11-15 郑昀 草稿Implementation Scope:本文档将说明出现一种不容易想到原因的访问数据库时发生"一般性网络错误",错误报告的来源是ADODB,错误号是"-2147467259,或者0x80004005".   继续

如何寻求另类方法解决网站排名不稳定的现象

  大多数站长肯定都经历过网站一波三折的经历,今天也许网站的排名还在后几页,明天就进入了百度首页,而在等几天网站排名突然有丧失,这些经历让很多站长感受到做网站的无奈,其实对于一名站长朋友来说网站优化出现波折经历不一定是坏事,只要找准解决方法,学会化解难题,那么我们的建站经历也会很顺畅的. 下面从网站的几种波折经历谈一谈互联网建站遇到波折如何处理,如何通过另类的解决方法帮助站长化解难题,从而为网站的崛起而产生助力. 第一类:网站是新站,波折处理要稳妥 有一部分网站排名出现波折是因为网站本身是新网站