千家信息网

Node.js中如何解决“背压”问题

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,Node.js中如何解决"背压"问题,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Node.js的4种stream是什么
千家信息网最后更新 2025年02月01日Node.js中如何解决“背压”问题

Node.js中如何解决"背压"问题,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Node.js的4种stream是什么?什么是背压问题?把一个东西从 A 搬到 B 该怎么搬呢?

抬起来,移动到目的地,放下不就行了么。

那如果这个东西有一吨重呢?

那就一部分一部分的搬。

其实 IO 也就是搬东西,包括网络的 IO、文件的 IO,如果数据量少,那么直接传送全部内容就行了,但如果内容特别多,一次性加载到内存会崩溃,而且速度也慢,这时候就可以一部分一部分的处理,这就是流的思想。

各种语言基本都实现了 stream 的 api,Node.js 也是,stream api 是比较常用的

流的直观感受

从一个地方流到另一个地方,显然有流出的一方和流入的一方,流出的一方就是可读流(readable),而流入的一方就是可写流(writable)。

当然,也有的流既可以流入又可以流出,这种叫做双工流(duplex)

既然可以流入又可以流出,那么是不是可以对流入的内容做下转换再流出呢,这种流叫做转换流(transform)

duplex 流的流入和流出内容不需要相关,而 transform 流的流入和流出是相关的,这是两者的区别。

流的 api

Node.js 提供的 stream 就是上面介绍的那 4 种:

const stream = require('stream');// 可读流const Readable = stream.Readable;// 可写流const Writable = stream.Writable;// 双工流const Duplex = stream.Duplex;// 转换流const Transform = stream.Transform;

它们都有要实现的方法:

  • Readable 需要实现 _read 方法来返回内容

  • Writable 需要实现 _write 方法来接受内容

  • Duplex 需要实现 _read 和 _write 方法来接受和返回内容

  • Transform 需要实现 _transform 方法来把接受的内容转换之后返回

我们分别来看一下:

Readable

Readable 要实现 _read 方法,通过 push 返回具体的数据。

const Stream = require('stream');const readableStream = Stream.Readable();readableStream._read = function() {    this.push('阿门阿前一棵葡萄树,');    this.push('阿东阿东绿的刚发芽,');    this.push('阿东背着那重重的的壳呀,');    this.push('一步一步地往上爬。')    this.push(null);}readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

当 push 一个 null 时,就代表结束流。

执行效果如下:

创建 Readable 也可以通过继承的方式:

const Stream = require('stream');class ReadableDong extends Stream.Readable {    constructor() {        super();    }    _read() {        this.push('阿门阿前一棵葡萄树,');        this.push('阿东阿东绿的刚发芽,');        this.push('阿东背着那重重的的壳呀,');        this.push('一步一步地往上爬。')        this.push(null);    }}const readableStream = new ReadableDong();readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

可读流是生成内容的,那么很自然可以和生成器结合:

const Stream = require('stream');class ReadableDong extends Stream.Readable {    constructor(iterator) {        super();        this.iterator = iterator;    }    _read() {        const next = this.iterator.next();        if(next.done) {            return this.push(null);        } else {            this.push(next.value)        }    }}function *songGenerator() {    yield '阿门阿前一棵葡萄树,';    yield '阿东阿东绿的刚发芽,';    yield '阿东背着那重重的的壳呀,';    yield '一步一步地往上爬。';}const songIterator = songGenerator();const readableStream = new ReadableDong(songIterator);readableStream.on('data', (data)=> {    console.log(data.toString())});readableStream.on('end', () => {    console.log('done~');});

这就是可读流,通过实现 _read 方法来返回内容。

Writable

Writable 要实现 _write 方法,接收写入的内容。

const Stream = require('stream');const writableStream = Stream.Writable();writableStream._write = function (data, enc, next) {   console.log(data.toString());   // 每秒写一次   setTimeout(() => {       next();   }, 1000);}writableStream.on('finish', () => console.log('done~'));writableStream.write('阿门阿前一棵葡萄树,');writableStream.write('阿东阿东绿的刚发芽,');writableStream.write('阿东背着那重重的的壳呀,');writableStream.write('一步一步地往上爬。');writableStream.end();

接收写入的内容,打印出来,并且调用 next 来处理下一个写入的内容,这里调用 next 是异步的,可以控制频率。

跑了一下,确实可以正常的处理写入的内容:

这就是可写流,通过实现 _write 方法来处理写入的内容。

Duplex

Duplex 是可读可写,同时实现 _read 和 _write 就可以了

const Stream = require('stream');var duplexStream = Stream.Duplex();duplexStream._read = function () {    this.push('阿门阿前一棵葡萄树,');    this.push('阿东阿东绿的刚发芽,');    this.push('阿东背着那重重的的壳呀,');    this.push('一步一步地往上爬。')    this.push(null);}duplexStream._write = function (data, enc, next) {    console.log(data.toString());    next();}duplexStream.on('data', data => console.log(data.toString()));duplexStream.on('end', data => console.log('read done~'));duplexStream.write('阿门阿前一棵葡萄树,');duplexStream.write('阿东阿东绿的刚发芽,');duplexStream.write('阿东背着那重重的的壳呀,');duplexStream.write('一步一步地往上爬。');duplexStream.end();duplexStream.on('finish', data => console.log('write done~'));

整合了 Readable 流和 Writable 流的功能,这就是双工流 Duplex。

Transform

Duplex 流虽然可读可写,但是两者之间没啥关联,而有的时候需要对流入的内容做转换之后流出,这时候就需要转换流 Transform。

Transform 流要实现 _transform 的 api,我们实现下对内容做反转的转换流:

const Stream = require('stream');class TransformReverse extends Stream.Transform {  constructor() {    super()  }  _transform(buf, enc, next) {    const res = buf.toString().split('').reverse().join('');    this.push(res)    next()  }}var transformStream = new TransformReverse();transformStream.on('data', data => console.log(data.toString()))transformStream.on('end', data => console.log('read done~'));transformStream.write('阿门阿前一棵葡萄树');transformStream.write('阿东阿东绿的刚发芽');transformStream.write('阿东背着那重重的的壳呀');transformStream.write('一步一步地往上爬');transformStream.end()transformStream.on('finish', data => console.log('write done~'));

跑了一下,效果如下:

流的暂停和流动

我们从 Readable 流中获取内容,然后流入 Writable 流,两边分别做 _read 和 _write 的实现,就实现了流动。

背压

但是 read 和 write 都是异步的,如果两者速率不一致呢?

如果 Readable 读入数据的速率大于 Writable 写入速度的速率,这样就会积累一些数据在缓冲区,如果缓冲的数据过多,就会爆掉,会丢失数据。

而如果 Readable 读入数据的速率小于 Writable 写入速度的速率呢?那没关系,最多就是中间有段空闲时期。

这种读入速率大于写入速率的现象叫做"背压",或者"负压"。也很好理解,写入段压力比较大,写不进去了,会爆缓冲区,导致数据丢失。

这个缓冲区大小可以通过 readableHighWaterMark 和 writableHightWaterMark 来查看,是 16k。

解决背压

怎么解决这种读写速率不一致的问题呢?

当没写完的时候,暂停读就行了。这样就不会读入的数据越来越多,驻留在缓冲区。

readable stream 有个 readableFlowing 的属性,代表是否自动读入数据,默认为 true,也就是自动读入数据,然后监听 data 事件就可以拿到了。

当 readableFlowing 设置为 false 就不会自动读了,需要手动通过 read 来读入。

readableStream.readableFlowing = false;let data;while((data = readableStream.read()) != null) {    console.log(data.toString());}

但自己手动 read 比较麻烦,我们依然可以用自动流入的方式,调用 pause 和 resume 来暂停和恢复就行了。

当调用 writable stream 的 write 方法的时候会返回一个 boolean 值代表是写入了目标还是放在了缓冲区:

  • true: 数据已经写入目标

  • false:目标不可写入,暂时放在缓冲区

我们可以判断返回 false 的时候就 pause,然后等缓冲区清空了就 resume:

const rs = fs.createReadStream(src);const ws = fs.createWriteStream(dst);rs.on('data', function (chunk) {    if (ws.write(chunk) === false) {        rs.pause();    }});rs.on('end', function () {    ws.end();});ws.on('drain', function () {    rs.resume();});

这样就能达到根据写入速率暂停和恢复读入速率的功能,解决了背压问题。

pipe 有背压问题么?

平时我们经常会用 pipe 来直接把 Readable 流对接到 Writable 流,但是好像也没遇到过背压问题,其实是 pipe 内部已经做了读入速率的动态调节了。

const rs = fs.createReadStream(src);const ws = fs.createWriteStream(dst);rs.pipe(ws);

总结

流是传输数据时常见的思想,就是一部分一部分的传输内容,是文件读写、网络通信的基础概念。

Node.js 也提供了 stream 的 api,包括 Readable 可读流、Writable 可写流、Duplex 双工流、Transform 转换流。它们分别实现 _read、_write、_read + _write、_transform 方法,来做数据的返回和处理。

创建 Readable 对象既可以直接调用 Readable api 创建,然后重写 _read 方法,也可以继承 Readable 实现一个子类,之后实例化。其他流同理。(Readable 可以很容易的和 generator 结合)

当读入的速率大于写入速率的时候就会出现"背压"现象,会爆缓冲区导致数据丢失,解决的方式是根据 write 的速率来动态 pause 和 resume 可读流的速率。pipe 就没有这个问题,因为内部做了处理。

流是掌握 IO 绕不过去的一个概念,而背压问题也是流很常见的问题,遇到了数据丢失可以考虑是否发生了背压。希望这篇文章能够帮大家理清思路,真正掌握 stream!

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

阿东 内容 数据 速率 方法 问题 就是 缓冲 缓冲区 背着 葡萄 阿门 处理 时候 一方 双工 东西 代表 方式 目标 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 目前最好的软件开发过程模型 jsp项目如何修改数据库 信息工程和软件开发有什么区别 滨州陶瓷软件开发公司 奖学金评定系统数据库ppt 中国移动宽带有自己的服务器 移动软件开发就业方向 系统软件开发需要多少钱 落实网络安全工作的报告 设计师和软件开发哪个工资高 金蝶服务器维修多少钱 数据库是单用户但是无法分离 数据库运行一段时间后连接失败 武威网络安全工程师书籍 我的世界服务器怎么买地皮 计算机网络技术怎么打开交换机 数据库至少包括哪两个文件 网络安全导论试题 长江大学计算机网络技术 王者怎么清除不登录的服务器 萨摩耶互联网金融科技 上海大型软件开发生产厂家 his服务器 高防云服务器价格高防云服务器 MySQL指定数据库文件 数据库学生表dept是什么 微信软件开发免费最新版 php数据库查询效率 软件开发店铺名称大全 反编译apk更改游戏服务器
0