通通连起来——无处不在的流

最近总是听见 liu 这个东西啊,比如 liu 翔低调宣布新恋情啦、 liu 强冬告白奶茶啦、微软停止支持 IE liu 览器啦,最近我们的淘宝前端夜校讲师也提到了 liu (流)的运用。

在 Unix 系统中流就是一个很常见也很重要的概念。

Unix 的哲学:一个程序只做一件事,并做好。程序要能协作。程序要能处理文本流,因为这是最通用的接口。
– Douglas McIlroy

还记得在你刚开始学习编程,尤其是学 C 语言会接触到文件流的概念。不限于文件系统的文件,还有输入输出的逻辑文件,因为在 C 中所有流均以文件的形式出现,今天肯定不是说 C , 在以异步 IO 高效著称 Node.js 中,流也是一个值得深入理解的概念。

在前端开发中,你可能见过这样的构建代码。

gulp.task('images', ['clean'], function() {  return gulp.src(paths.images)    .pipe(gulp.dest('build/img'));});

那么什么是流?

用术语说流是对输入输出设备的抽象。以程序的角度说,流是具有方向的数据。在 Unix 系统中,我们使用符号|来实现流。
在 Node.js 中有个 stream 模块,它是一个抽象类。它的抽象接口被很多常见对象实现,比如常见的 request response

按照流动方向,stream 流分为 4 种,Readable,writable,Duplex streams 和 Transform streams。后两种可以理解为可读可写流的组合,所以优先理解可读可写流。

可以直接使用 stream_readable 创建一个流

var Readable = require('stream').Readable;var rs = new Readable;rs.push('beep ');rs.push('boop ');rs.push('null\n');rs.push(null);rs.pipe(process.stdout);

运行代码

$ node readable.jsbeep boop null

数据被原样输出了,nul l 表示输出结束。

流的模式

那么如何才能协调输入输出呢,实际上,readable 流的数据会存在缓存中,直到有个流来消耗这些数据。简单的说,就是要多少,就尽量给多少。

readable stream 有个待实现接口 _read(size)

readable._read(size)

  • size Number Number of bytes to read asynchronously

可以通过这个接口来验证流的工作

var Readable = require('stream').Readable;var rs = Readable();

var c = 97 - 1;

rs._read = function () {    if (c >= 'z'.charCodeAt(0)) return rs.push(null);    //设置延时,有个直观感受    setTimeout(function () {        rs.push(String.fromCharCode(++c));    }, 200);};

rs.pipe(process.stdout);

process.on('exit', function () {    console.error('\n_read() called ' + (c - 97) + ' times');});process.stdout.on('error', process.exit);

运行代码

$ node readable.js | head -c 5abcde_read() called 5 times

大约一秒后,5 个字符直接输出了。
所以其实 readable 流是到了一定的水位,才会将这些数据吐出来。

你可能已经感觉到了,可读流一天到晚就处在两种状态:我吐了、我不吐了!

事实上确实是这样, Node.js 文档对流状态的说明。

Readable streams have two “modes”: a flowing mode and a paused mode.

流的两种状态分别称为流模式和暂停模式
进入流模式有三种方法

  • Adding a ‘data’ event handler to listen for data.
  • Calling the resume() method to explicitly open the flow.
  • Calling the pipe() method to send the data to a Writable.

也有两种方法暂停

  • If there are no pipe destinations, by calling the pause() method.
  • If there are pipe destinations, by removing any ‘data’ event handlers, and removing all pipe destinations by calling the unpipe() method.

流的常见应用

一个流的日常任务:
举个栗子:服务器要读取文件返回给客户端:

var http = require('http');var fs = require('fs');

var server = http.createServer(function (req, res) {    fs.readFile(__dirname + '/kuaibo.txt', function (err, data) {        res.end(data);    });});server.listen(8000);

这段代码是没有问题的,测试也能通过,但是不太友好。设想 kuaibo.txt size 有点大,程序读取这个文件并存在内存中,然后把结果再返回给客户端,此时正好有 1000 个并发请求,程序消耗大量内存,大量请求被挂起,服务会变得很慢或者不可用。即使只有一个请求,如果文件足够大,也会使用户等待。

可以使用 stream + pipe 的方式实现。由于 response 是一个 writable stream。

var http = require('http');var fs = require('fs');

var server = http.createServer(function (req, res) {    var stream = fs.createReadStream(__dirname + '/kuaibo.txt');    stream.pipe(res);});server.listen(8000);

pipe 会自主管理流并且尽可能快的读取流。
可以有个合理的猜测,那个据说能边下边播的著名的全视频解码软件-快播可能就是基于对视频流片段的实时解码实现的吧?虽然我没用过-。-!

管道

既然叫管道,那管道应该可以接起来啊~

『通通连起来』 –庞统

事实上确实可以。

var r = fs.createReadStream('file.txt');var z = zlib.createGzip();var w = fs.createWriteStream('file.txt.gz');r.pipe(z).pipe(w);

在从一个 readable 流向一个 writable 流传数据的过程中,数据会自动被转换为 Buffer 对象。使用 pipe 的还有一个好处是,pipe() 方法自动管理流,把控制流的细节屏蔽了。stream 的实现经过几个版本,现在是这样的流程大约是这样的:

  • 触发 data 事件,直到 writable 流满了
  • buffer 满了触发 pause() , 进入暂停模式
  • writable 的 Buffer 清理出来了,会触发 drain 事件,这时候 pipe 调用 resume 又进入流模式,然后再触发 data 事件。

stream_readable 模块 data 事件源码

stream.on('data', function(chunk) {  debug('wrapped data');  if (state.decoder)    chunk = state.decoder.write(chunk);  // don't skip over falsy values in objectMode  if (state.objectMode && (chunk === null || chunk === undefined))    return;  else if (!state.objectMode && (!chunk || !chunk.length))    return;  var ret = self.push(chunk);  if (!ret) {    paused = true;    stream.pause();  }});

所有的流都是 EventEmitter 的实例。大部分情况下,我们只需要在上面添加我们希望触发的事件。

比如实现一个 HTTP 代理服务。

var http = require('http');var net = require('net');var url = require('url');

function request(cReq, cRes) {    var u = url.parse(cReq.url);    var options = {        hostname : u.hostname,         port     : u.port || 80,        path     : u.path,               method     : cReq.method,        headers     : cReq.headers    };    var pReq = http.request(options, function(pRes) {        cRes.writeHead(pRes.statusCode, pRes.headers);        pRes.pipe(cRes);    }).on('error', function(e) {        cRes.end();    });    cReq.pipe(pReq);}

http.createServer().on('request', request).listen(8888, '0.0.0.0');

总结

Node.js 对流做了封装,提供很多 API,流其实是一个事件分发器。仅仅基于流的特性,我们就可以做很多功能。而现在很多成熟的系统,有的基于 Java,有的基于 C#,通常已经非常庞大,而对其做一次线上的全量升级,升级周期可能会长达半年甚至更长。这时候不妨利用 Nodejs 构建一个简单的原型服务,如果可以匹配 Node.js 的适用场景,再投入时间用 Nodejs 替换系统的部分服务,说不定是个更快速达到目标的方案。

参考资料

该文章来自:http://taobaofed.org/blog/2016/01/28/nodejs-stream/

作者:愈之

时间: 2024-09-17 04:31:41

通通连起来——无处不在的流的相关文章

街旁:一本城市旅人日记

作为一款类Foursquare的本土化应用,街旁的出现和走红都充满了感性. 作者:苏娟25岁的街旁创始人刘颖比喻说,就像是一家夜店永远需要新鲜元素来吸引老顾客,社交网络也需要通过不断升级来提升用户体验,不能让用户无休止地下载新的客户端 这是一个有些感性的创业组织.作为一款类Foursquare应用,街旁上线之初的介绍文字就带着十分浓郁的文艺色彩:街旁是一本城市旅人日记,记录双腿的经历,记载心灵的感受.通过与苹果.多普达.雕刻时光咖啡以及一些音乐演出进行合作,街旁迅速在城市达人心中以极具亲和力的方

访问XML数据的三中基于树模型||基于游标||流式API比较

xml|比较|访问|数据|游标 无处不在的 XML 除了可以表示结构化和半结构化的数据之外,XML 还有许多其他特性,使其成为一种被广泛采用的数据表示格式.XML 是可扩展的,与平台无关的,并且由于其完全采用 Unicode 而支持国际化.XML 是基于文本的格式,因此,用户可以根据需要使用标准的文本编辑工具读取和编辑 XML 文档. XML 的可扩展性表现在多个方面.首先,与 HTML 不同,XML 没有固定的词汇表.相反,用户可以使用 XML 定义特定的应用程序或行业专用的词汇表.其次,与使

WPF界面设计技巧(11)-认知流文档 & 小议WPF的野心

原文:WPF界面设计技巧(11)-认知流文档 & 小议WPF的野心     流文档是WPF中的一种独特的文档承载格式,它的书写和呈现方式都很像HTML,它也几乎具备了HTML的绝大多数优势,并提供了更强的编程支持及对WPF其他元素的兼容.   直接来看代码吧,需要讲解的地方比较多,我就直接注释在代码里了,看起来更方便些:     Code<Window x:Class="流文档.Window1"     xmlns="http://schemas.microso

英特尔戴金权:从芯片到软件无处不在 释放大数据分析和机器学习潜能

5月18日,由中国电子学会主办,ZD至顶网协办的第八届中国云计算大会在北京国家会议中心隆重举办.在大会上,来自英特尔的大数据首席架构师.资深首席工程师戴金权做了主题为"从芯片到软件无处不在,释放大数据分析和机器学习潜能"的精彩演讲. 英特尔大数据首席架构师.资深首席工程师 戴金权 戴金权在演讲中讲到:"英特尔从硬件到软件做了大量工作,为基于大数据的高级分析和机器学习提供了非常好的支持,特别是基于大数据上面的机器学习和人工智能.高级分析,是英特尔看到的一个非常重要的趋势.而今天

天衣(Sky-E):会呼吸的网络与可感知的流

<连线(Wired)>杂志创始主编.全球顶级网络知名人物凯文.凯利,日前在北京的某个网络智慧峰会主题演讲中分享了代表十年后网络生活画面的若干关键词,"流"(Flow)便是其中之一.凯文.凯利认为,流是无处不在的信息交互的媒体与介质,技术可以借由流动的"流"让数字生活变得非常简单,生命也是一个流,不断流动的一个东西,随时随地都在线,这就是流的概念. 从某种意义上来说,"流"的出现,使网络更富有生命力.更加拟人化,"流"

聊聊无处不在的O2O:进入垂直时代

今天跟大家说点儿什么呢?首先啊,在正式开始之前,先说两件事吧,第一件,著名的央视播音员李瑞英和张宏民告别新闻联播,因为年龄等问题调到了央视的培训部门,多年的老声音突然离开了,小伙伴们很是不舍,据说这种悲伤不比当年罗京驾鹤西去的感觉好多少,我就纳闷了,现在的年轻人到底有多少热门在看新闻联播呢?不过,这个主播有个让人一听就过耳不忘的声音的肯定是个好主播,譬如三表,譬如我,是吧!哈哈,开个玩笑! 第二件事,是互联网方面的,百度百家最近刚发表了一个关于腾讯手游主题报道,标题是"内斗侵蚀腾讯手游 垄断玩法

Delphi压缩流和解压流的应用

软件开发者不免都要遇到压缩数据的问题!经常使用Delphi的朋友都知道,它为我们提供了两个流类(TCompressionStream和TDecompressionStream)来完成数据的压缩和解压缩,但美中不足的是,该流在Delphi 的帮助中没有详细的说明,使得它们在使用起来有一定得困难.其实在Delphi系统中提供了这两个类的源代码和库.保存在Delphi 光盘的InfoExtraslib Src和InfoExtraslibObj目录中(其中OBJ目录中保存的是库,Src目录中保存的是源代

【转】Java I/O流概念分析整理

转载地址:http://blog.csdn.net/yuebinghaoyuan/article/details/7388059  Java中的流,可以从不同的角度进行分类. 按照数据流的方向不同可以分为:输入流和输出流. 按照处理数据单位不同可以分为:字节流和字符流. 按照实现功能不同可以分为:节点流和处理流. 输出流: 输入流: 因此输入和输出都是从程序的角度来说的. 字节流:一次读入或读出是8位二进制. 字符流:一次读入或读出是16位二进制. 字节流和字符流的原理是相同的,只不过处理的单位

PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...

标签 PostgreSQL , 流式统计 , insert on conflict , count , avg , min , max , sum 背景 流式统计count, avg, min, max, sum等是一个比较有意思的场景,可用于实时大屏,实时绘制统计图表. 比如菜鸟.淘宝.阿里游戏.以及其他业务系统的FEED日志,按各个维度实时统计输出结果.(实时FEED统计,实时各维度在线人数等) PostgreSQL insert on conflict语法以及rule, trigger的功