探究 Node.js 中的 drain 事件

起因

最近在用 Node.js 写一些网络请求相关的代码时,频繁在一些开源代码中看到 drain 事件的使用,于是我也依葫芦画瓢写到了自己的代码里面:

socket.on('drain', function(){
    console.log('drain event fired.');
});

socket.write('some data.');

实际放到线上测试的时候发现,在一些情况下,drain 事件真的会被触发,那到底什么时候会触发 drain 事件呢?drain 事件能用来做什么呢?本着打破砂锅问到底的精神,我决定探究一番。

TLDR

请直接跳到小结部分。

探究

最简单的办法就是查文档。因为我写的是网络请求相关的代码,那么我就先翻越了 net 和 socket 相关的部分。在 Node.js 官方文档中,对于 socket.write 有这么一部分描述:

Returns true if the entire data was flushed successfully to the kernel buffer. Returns false if all or part of the data was queued in user memory. 'drain' will be emitted when the buffer is again free.

也就是说,drain 事件是和 socket.write 的返回值强关联的,那么我们可以做一个简单的实验(只写关键部分):

socket.on('drain', function(){
    console.log('drain event fired.');
});

var ret = socket.write('some data.');

console.log('write data returned %s.', ret);

可是无论我怎么运行这部分代码,返回值总是 true,drain 事件没有被触发。那为啥线上就能触发呢?按照文档所说,只有全部或者部分数据被缓冲在了内存里面才会返回 false。那问题又来了,什么时候数据才会被缓冲呢?

既然是被缓冲了,那最先猜测到就是数据流量太大。就像每天上下班高峰期的文一西路那样,一旦车流量太大,前面的路口塞满了,交警就会让后面的车停下来。

好,那我们加大“车”流量(为节省篇幅,部分代码省略):

服务端代码:

var net = require('net');
net.createServer(function(socket){
    // do something
}).listen(6666, '127.0.0.1');

客户端代码:

var net = require('net');

function writeToRemote(){
    var client = new net.Socket();
    client.connect(6666, '127.0.0.1', function(){
        if(!client.write('some data by ziying.')){
            console.log('write returned false.');
        }
    });
    client.on('drain', function(){
        console.log('drain event fired.');
    });
    client.on('data', function(){
        client.destroy();
    });
}

// 加大流量
for(var i = 0; i < 1000; i++){
    writeToRemote();
}

但运行多次之后发现仍旧没有看到任何 drain 事件的迹象。难道次数不够?随着我继续增大 i 的最大值,直到遇到(libuv) kqueue(): Too many open files in system的错误时候,我仍旧没看到 drain 事件。

逼我用绝招。看 Node.js 源代码!

因为 socket.write 实际上是调用的 Stream.write(参考此处源代码),最后我们在 Stream.write的代码中找到了一丝端倪:

var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
if (!ret)
  state.needDrain = true;

// something else

return ret;

可以看到当要写的数据的长度大于 highWaterMark (字面理解:高水位线)的时候,那么 Stream.write 就会返回 false,也就会触发 drain 事件了。

那这个高水位线具体是多少呢?可以继续看代码

var hwm = options.highWaterMark;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

默认值是 16KB,看来还是挺大的啊。所以回想一下刚才我们的实验程序,一个是写的数据比较小,另外一个是实验代码中的服务器端没有复杂逻辑,数据处理的也比较快,我们仍旧拿刚才的车流量的例子,虽然车很多很多,但是如果每辆车都开得飞快,那路也不会堵。只有当一些车比较慢,影响到了后面车的速度的时候,整体速度就会下来,就变堵了。

根据这个思路,我们换成下面这个实验:

var http = require('http');
var fs = require('fs');

http.createServer(function(req, res) {
    // 哈哈,这个代码写于圣诞,于是就用了圣诞的歌曲,大家圣诞快乐哦!
    var filePath = './Christmas.mp3';
    var stat = fs.statSync(filePath);

    response.writeHead(200, {
        'Content-Type': 'audio/mpeg',
        'Content-Length': stat.size
    });

    var readStream = fs.createReadStream(filePath);
    readStream.on('data', function(data) {
        res.write(data);
    });

    res.on('drain', function() {
      console.log('drain event fired.');
    });

    readStream.on('end', function() {
        res.end();
    });
}).listen(6969);

这里我们启动了一个简单的 http server,任何 requset 过来,都会返回一个 5M 大小的 圣诞歌曲的内容。然后我们对这个 http server 发起 1000 次 GET 请求。果然不出所料,还没等所有请求发完,一堆的 drain event fired 日志出现了:

如果对 res.write 的返回值做下日志,也会发现返回了很多 false。

原因也很容易想到,硬盘读取这个 MP3 文件的速度(测试环境为 RMBP 的 SSD 硬盘)一般都会快于将数据通过 HTTP Response 返回给用户(即便是 localhost 的访问,更不用说外网复杂错综的网络环境了),所以,当 MP3 很快就被读取过来,但又没有很快的将数据写回,那么这个 Stream 中的 data 就被缓存了。于是,我很自然而然的设想,在我这个小应用中,也许这么做并没有什么,但当应用的访问量逐渐增大的时候,这个问题就可能会爆发,比如造成个内存泄露啊之类的。

怎么用

那上面的代码该怎么改进呢?

var readStream = fs.createReadStream(filePath);
readStream.on('data', function(data) {
    if(!res.write(data)){
        readStream.pause();
    }
});

res.on('drain', function() {
    readStream.resume();
});

也就是说,当 write 返回 false 的时候,我们暂停读取流,当缓存的数据清空之后,我们再继续读取流,相当于我们根据输出流来对读取流做限流。反过来,如果写入流快于读取流,我们也可以对写入流限流。

真的是这样吗?

刚才提到了,我想如果我们没做这部分处理的话,应该很容易造成内存泄露,那我们不妨做个实验验证下。使用上面两段代码分别进行不带限流和带限流的实验,每隔 1s 使用 process.memoryUsage() 打印出来内存使用(我们在启动 server 之后就开始打印内存数据):

xxx.listen(PORT, function(){

  setInterval(function(){
    var memoryUsage = process.memoryUsage();
    // 用这个格式是为了方便我使用 Markdown 生成表格导入到 Numbers
    console.log('| %s | %s | %s |', memoryUsage.rss, memoryUsage.heapTotal, memoryUsage.heapUsed);
  }, 1000);

});

最终我得出了如下数据:

限流

没有限流

可以看到,内存使用其实差不多......

难道我猜想错了么?最简单的办法,继续看源代码。

看下 fs.createReadStream 的代码,就可以知道它的工作方式是先在内存中准备一段 Buffer,然后在 fs.read() 读取文件到这个 Buffer 中,完成一次读取时,则从 Buffer 中通过 slice 方法取出那个数据作为一个小 Buffer 对象,再通过 data 事件传给调用方[1]。

再看下 pause 具体是怎么实现的:

Readable.prototype.pause = function() {
  if (false !== this._readableState.flowing) {
    this._readableState.flowing = false;
    this.emit('pause');
  }
  return this;
};

pause 方法设置了一个状态位。在 flow 方法中,如果是 paused 的状态,那么就不再从 文件流中读取数据了:

function flow(stream) {
  var state = stream._readableState;
  debug('flow', state.flowing);
  if (state.flowing) {
    do {
      var chunk = stream.read();
    } while (null !== chunk && state.flowing);
  }
}

所以在 pause 的时候,文件的数据流依旧在 Buffer 中。

回头看下我们这个例子,其实是写比较慢,如果我们没有对写入的返回值做判断的话,Writable Stream 本身也会把多余的数据缓存起来。具体可以看 WriteOrBuffer 这个方法的实现

if (state.writing || state.corked) {
    // buffer data
} else {
    doWrite();
}

可以看出这里如果之前的写入没有完成,那么会把需要写入的数据缓存起来。

这样就不难理解了,如果我们采用限制读取流的方案,那么数据缓存在读取流的 Buffer 里,如果我们采取不限制读取流的方案,那么数据缓存在写入流的 Buffer 里,总之这部分数据都是要被缓存,只是缓存到不同的流的 Buffer 而已,所以这也能解释了为啥我们的测试结果,内存占用基本没有差别了。

更好的解决方案

其实,Node.js 里面提供了更好的解决方案,也就是 pipe。

我们将上面的代码继续改造下:

var readStream = fileSystem.createReadStream(filePath);
readStream.pipe(res);

是不是更简单了呢?

我们看下官方代码是怎么实现的(缩减版):

  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);

  src.on('data', ondata);
  function ondata(chunk) {
    var ret = dest.write(chunk);
    if (false === ret) {
      src.pause();
    }
  }

是不是和我们自己的实现差不多呢?当然官方代码处理的更加细致,对很多情况做了判断,感兴趣的同学不妨阅读看看:)

该怎么用?

看到这里,大家可能会问,那按照你这么说,drain 就完全没用了是么?非也非也。虽然我一开始也是这么想的,但经过我在 github 上一阵狂搜,终于有了一些端倪。

再回想我们刚才的案例,我们的写入流是以 HTTP Response 的形式返回给用户,但如果写入流是我们的一个服务呢?比如我们需要往一台 Redis Server 里面插入大量数据,而这台 Redis 又承担对外提供服务的艰巨任务,那么,为了保证我们写入的同时这台 Server 依旧能较好的对外服务,很自然的就可以想到我们可以对写入流做限流。比如node_redis 这个模块里面的一个 example

'use strict';

var redis = require('../index'),
    client = redis.createClient(),
    remaining_ops = 100000, paused = false;

function op() {
    if (remaining_ops <= 0) {
        console.error('Finished.');
        process.exit(0);
    }

    remaining_ops--;
    client.hset('test hash', 'val ' + remaining_ops, remaining_ops);
    if (client.should_buffer === true) {
        console.log('Pausing at ' + remaining_ops);
        paused = true;
    } else {
        setTimeout(op, 1);
    }
}

client.on('drain', function () {
    if (paused) {
        console.log('Resuming at ' + remaining_ops);
        paused = false;
        process.nextTick(op);
    } else {
        console.log('Got drain while not paused at ' + remaining_ops);
    }
});

op();

在这个例子中,当写入流开始缓存的时候,我们就停止写入了,等 buffer flush 完,我们再继续写入。通过这种形式,我们可以控制发送命令的速率。

如果你有更好的案例,欢迎与我一起探讨。

小结

其实在我们平时写代码的过程中,并不需要刻意用到 drain 事件。Node.js 本身已经帮我们处理了很多细节。对于流的处理,推荐大家直接用简单方便的 pipe 方法。而drain 事件比较适合用在一些需要自己手工处理限流的场景。另外,在看 Node.js 源码的时候很多细节还是很赞的,推荐大家可以阅读下。

注解

  1. 朴大师的《深入浅出 Node.js》6.4节
时间: 2024-08-02 10:28:52

探究 Node.js 中的 drain 事件的相关文章

Node.js中HTTP模块与事件模块详解_node.js

Node.js的http服务器 通过使用HTTP模块的低级API,Node.js允许我们创建服务器和客户端.刚开始学node的时候,我们都会遇到如下代码: 复制代码 代码如下: var http = require('http'); http.createServer(function (req,res) {     res.end('Hello World\n'); }).listen(3000,"127.0.0.1"); console.log("Server funni

Node.js中的流(Stream)介绍

 这篇文章主要介绍了Node.js中的流(Stream)介绍,本文讲解了什么是流.pipe方法.流的分类.Readable流状态的切换等内容,需要的朋友可以参考下     什么是流? 说到流,就涉及到一个*nix的概念:管道--在*nix中,流在Shell中被实现为可以通过 |(管道符) 进行桥接的数据,一个进程的输出(stdout)可被直接作为下一个进程的输入(stdin). 在Node中,流(Stream)的概念与之类似,代表一种数据流可供桥接的能力. pipe 流化的精髓在于 .pipe(

浅析Node.js 中 Stream API 的使用_node.js

本文由浅入深给大家介绍node.js stream api,具体详情请看下文吧. 基本介绍 在 Node.js 中,读取文件的方式有两种,一种是用 fs.readFile ,另外一种是利用 fs.createReadStream 来读取. fs.readFile 对于每个 Node.js 使用者来说最熟悉不过了,简单易懂,很好上手.但它的缺点是会先将数据全部读入内存,一旦遇到大文件的时候,这种方式读取的效率就非常低下了. 而 fs.createReadStream 则是通过 Stream 来读取

Node.js中的process.nextTick使用实例

  这篇文章主要介绍了Node.js中的process.nextTick使用实例,nextTick函数有什么用.怎么用.和setTimeout有什么区别呢,本文就讲解了这些知识,需要的朋友可以参考下 我已经不记得是在哪里第一次看到process.nextTick这个玩意的调用了,哦,应该是在nodejs官方的process文档里看到的.当时就不理解这东西是干嘛的了,都已经有setTimeout了,还需要这个函数干嘛.而且从根本上来说,这个函数又是干嘛的?和setTimeout有什么区别? sta

在Node.js中使用HTTP上传文件的方法

  这篇文章主要介绍了在Node.js中使用HTTP上传文件的方法,作者以windows下的visual studio作为操作node的环境,推荐阅读!需要的朋友可以参考下 开发环境 我们将使用 Visual Studio Express 2013 for Web 作为开发环境, 不过它还不能被用来做 Node.js 开发.为此我们需要安装 Node.js Tools for Visual Studio. 装好后 Visual Studio Express 2013 for Web 就会转变成一

浅谈Node.js中的定时器

  本文给大家分享的是Node.js中的定时器的相关资料,十分的全面细致,有需要的小伙伴可以参考下. Node.js中定时器的实现 上一篇博文提到,在Node中timer并不是通过新开线程来实现的,而是直接在event loop中完成.下面通过几个JavaScript的定时器示例以及Node相关源码来分析在Node中,timer功能到底是怎么实现的. JavaScript中定时器功能的特点 无论是Node还是浏览器中,都有setTimeout和setInterval这两个定时器函数,并且其工作特

关于在 Node.js 中引用模块,知道这些就够了

本文讲的是关于在 Node.js 中引用模块,知道这些就够了, Node 提供了两个核心模块来管理模块依赖: require 模块在全局范围内可用,不需要写 require('require'). module 模块同样在全局范围内可用,不需要写 require('module'). 你可以将 require 模块理解为命令,将 module 模块理解为所有引入模块的组织者. 在 Node 中引入一个模块其实并不是个多么复杂的概念. const config = require('/path/t

Node.js中Process.nextTick()和setImmediate()的区别

一.Webstrom使用node.js IDE的问题 在区别这两个函数之前来说一下Webstrom使用node.js IDE的问题,在配置Node.js的IDE了,但setImmediate().require等这些node.js中内置的函数时并不会有提示,要解决这个问题只要设置一下Webstrom,File---->Setting---->Languages&Frameworks------>Node.js and NPM中的Node.js core Library is no

Node.js中内存泄漏分析

内存泄漏(Memory Leak)指由于疏忽或错误造成程序未能释放已经不再使用的内存的情况.如果内存泄漏的位置比较关键,那么随着处理的进行可能持有越来越多的无用内存,这些无用的内存变多会引起服务器响应速度变慢,严重的情况下导致内存达到某个极限(可能是进程的上限,如 v8 的上限:也可能是系统可提供的内存上限)会使得应用程序崩溃. 传统的 C/C++ 中存在野指针,对象用完之后未释放等情况导致的内存泄漏.而在使用虚拟机执行的语言中如 Java.JavaScript 由于使用了 GC (Garbag