跳到主要内容

用 Stream 编程

· 阅读需 19 分钟

缓冲模式和流模式

  • 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如 fs.writeFilefs.readFile 等;

  • 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。

假如我们要读取一份特别庞大的文件,这份文件有好几个GB大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几GB的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。

在 Node.js 中可以通过 buffer.constants.MAX_LENGTH 查看某套开发环境最多可支持多少字节的缓冲区。

用缓冲模式的API把文件压缩成GZIP格式

import { gzip } from 'node:zlib';
import { promisify } from 'node:util';
import { promises as fs } from 'node:fs';

const gzipPromise = promisify(gzip);

const filename = process.argv[2];

async function main() {
const data = await fs.readFile(filename);
const gzippedData = await gzipPromise(data);
await fs.writeFile(`${filename}.gz`, gzippedData);
console.log('File successfully compressed');
}

main();

用流模式的API把文件压缩成GZIP格式

import { createGzip } from 'node:zlib';
import { createReadStream, createWriteStream } from 'node:fs';

const filename = process.argv[2];

createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'));

流对象结构

Node.js 平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream 核心模块提供的:

  • Readable
  • Writable
  • Duplex
  • Transform

每个 stream 类的对象,本身也都是一个 EventEmmiter 实例,所有流对象实际上可以触发许多事件,比如:

  • Readable 流在读取完毕时会触发 end 事件;
  • Writable 流在写入完毕后会触发 finish 事件;
  • 如果操作过程中发生错误,则会触发 error 事件;

流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:

  • 二进制模式(Binary mode):以chunk的形式串流数据,这种模式可以用来处理缓冲或者字符串;
  • 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如 Rxjs 这个库);

Readable 流(可读流)

要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式(non-flowing),也叫暂停模式,另一种是流动模式(flowing)。

非流动模式

下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。

process.stdin.on('readable', () => {
let chunk: Buffer | null;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
// 回显
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
}
}).on('end', () => {
// Windows 上是 Ctrl+Z,Linux和Mac上是 Ctrl+D
console.log('End of stream');
});

readable 一旦发生(按下 Enter 键),就说明有新的数据可以读取了。

process.stdin.read() 方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。

流动模式

流动模式下,我们不通过 read() 方法提取数据,而是等着流对象把数据推送到 data 监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:

process.stdin.on('data', (chunk) => {
console.log('New data available');
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
})
.on('end', () => {
console.log('End of stream');
});

实现自己的 Readable 流

自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 _read([size]) 方法提供实现代码,而这个方法内部又必须 readable.push(chunk) 这种操作向缓冲区里面填入数据。

_read() 方法和 read() 方法不通,后者是给流对象的消费方使用的,而 _read() 方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 _read() 将在每次调用 this.push(dataChunk) 后再次调用。 _read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。

比如下面代码,可以生成随机字符串流对象:

import { Readable, ReadableOptions } from 'node:stream';
import Chance from 'chance';

const chance = new Chance();

export class RandomStream extends Readable {
emittedBytes = 0;
constructor(options?: ReadableOptions) {
super(options); // 继承
}

_read(size: number): void {
// 生成长度为 size 的随机字符串
const chunk = chance.string({ length: size });
// 推入内部缓冲区
this.push(chunk, 'utf8');
this.emittedBytes += chunk.length;
// 百分之 5 的概率返回 true,并推入 null
// 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
}

使用 RandomStream

const randomStream = new RandomStream({
highWaterMark: 10
});
randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);
});

_read() 函数中接收一个 size 数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即 highWaterMark 配置项),当然这只是一个建议,不是强迫你必须这么做。

ReadableOptions 接收的 options 参数可能会有这样一些属性:

  • encoding: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是 null
  • objectMode: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是 false
  • highWaterMark: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,默认值是 16KB

简化版定制方案

如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options),并把一个包含 read() 方法的对象传给 options 参数即可。

let emittedBytes = 0;
const randomStream = new Readable({
highWaterMark: 10,
read(size: number): void {
const chunk = chance.string({ length: size });
this.push(chunk, 'utf8');
emittedBytes += chunk.length;
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
});

randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${emittedBytes}) bytes of radom data`);
});

ObjectMode 模式

有个叫做 Readable.from() 的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable 对象当做数据源,轻松构建 Readable 流。

import { Readable } from 'node:stream';

const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);

arrStream.on('data', (char: string) => {
console.log("🚀 ~ file: exercise.ts:6 ~ arrStream.on ~ char:", char);
}).on('end', () => {
console.log("end");
});

Writable 流(可写流)

向 Writable 流推送数据,是相当容易的,我们只需要使用 write 方法就行了,方法前面是:

writable.write(chunk, [encoding], [callback])

其中 encoding 参数和 callback 参数是可选的。如果 chunk 是字符串,那么 encoding 参数默认是 utf8,如果 chunk 是 Buffer,那么该参数的值会为系统所忽略。callback 表示这个函数会在系统把数据块写入底层资源的时候,得到调用。

如果想告诉 Writable 流,已经没有数据需要写入了,那么应该调用 end() 方法:

writable.end([chunk], [encoding], [callback]);

下面代码我们创建了一个小的 HTTP 服务器程序,让它输出一些随机字符串:

const chance = new Chance();
const server = createServer((_, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
while(chance.bool({ likelihood: 95 })) {
res.write(`${chance.string()}\n`);
}
res.end('\n\n');
res.on('finish', () => console.log('All data send.'));
});
server.listen(8082, () => {
console.log('listening on http://localhost:8082');
});

res 对象不仅是 http.ServerResponse 实例,同时也是一个 Writable 流。上面代码我们使用 curl localhost:8082 命令就可以看到服务器发来的随机字符串了。

实现 Writable 流

要实现一种新的 Writable 流,我们可以继承 Writable 类,并实现 _write() 方法。

假如我们要实现这样一种 Writable 流,接收下面这种格式的对象:

{
path: <文件路径>
content: <字符串或 buffer>
}

每收到这样一个对象,我们就会把 path 所指的路径下创建一份文件,并把 content 属性的内容存入该文件。

大家应该意识到,输入给我们这种 Writable 流的数据,并不是字符串或Buffer,而应该是对象,因此这种流必须在对象模式下运作。代码如下:

import { dirname } from 'node:path';
import { promises as fs } from 'node:fs';
import { Writable, WritableOptions } from 'node:stream';

interface ChunkType {
path: string;
content: string | Buffer;
}
export class ToFileStream extends Writable {
constructor(options: WritableOptions) {
super({ ...options, objectMode: true });
}

_write(chunk: ChunkType, _encoding: BufferEncoding, callback: (error?: Error) => void) {
// 递归创建多级文件夹,然后写入文件
fs.mkdir(dirname(chunk.path), { recursive: true })
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => callback())
.catch(callback);
}
}

使用:

tfs.write({
path: join('files', 'file1.txt'),
content: 'Hello',
});
tfs.write({
path: join('files', 'file2.txt'),
content: 'Node.js',
});
tfs.write({
path: join('files', 'file3.txt'),
content: 'streams',
});

tfs.end(() => console.log('All files created.'));

另外,通 Readable流一样,Writable流也支持简化版的定制方案:

const tfs = new Writable({
objectMode: true,
write(chunk: ChunkType, _encoding: BufferEncoding, callback) {
fs.mkdir(dirname(chunk.path), { recursive: true })
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => callback())
.catch(callback);
}
});

backpressure(防拥堵机制)

写入数据的速度可能要比消耗数据的速度要快,为了应对这种情况,流对象会把写进来的数据先放入缓冲区,但如果给该对象写入数据的那个人不知道已经出现这种情况,那么还是会不断地写入,导致内部缓冲区里面的数据越积越多,让内存使用量变得比较高。

为了提醒写入方注意这种问题,writable.write() 方法会在内部缓冲区触碰 highWaterMark(内部缓冲区的数据上限) 上限的时候,返回 false,以表明此时不应该再向其中写入内容。当缓冲区清空时,流对象会触发 drain 事件,以提示现在又可以向里面写入数据了。这套机制就叫做 backpressure(防拥堵机制)。

backpressure 只是一套建议机制,而不是强制实施的。即便 write() 返回 false,我们也还是可以忽略这个信号,继续往里面写入,让缓冲区越变越大。

这套机制其实在 Readable 流中也有类似的体现,在实现 _read() 方法时,如果发现自己调用 push() 方法得到的结果是 false,那就不应该再向其中推送新数据了。这个问题仅仅需要由实现 Readable 流的人来担心,而不太需要由使用这种流的人负责处理。

下面代码实现的是防拥堵机制的输出随机字符 HTTP 服务器:

const chance = new Chance();
const server = createServer((_, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
function generateMore() {
while(chance.bool({ likelihood: 95 })) {
const randomChunk = chance.string({
length: (16 * 1024) - 1
});
const shouldContinue = res.write(`${randomChunk}\n`);
if (!shouldContinue) { // 是否已拥堵
console.log('back-pressure');
// 监听 drain 事件,表面现在又可以向里面写入数据了
return res.on('drain', generateMore);
}
}
res.end('\n\n');
}
generateMore();
res.on('finish', () => console.log('All data send.'));
});
server.listen(8082, () => {
console.log('listening on http://localhost:8082');
});

Duplex(双工流/读写流)

Duplex 流既是 Readable 流,又是 Writable 流,它继承了 ReadableWritable 的方法。因此:

  • Duplex流既可以通过 read() 从流中读取数据,也可以通过 write() 给这样的流里面写入数据;
  • 既可以监听与数据读取有关的 readable 事件,也可以监听与数据写入有关的 drain 事件;
  • 如果要定制 Duplex 流,那么既要实现 _read() 方法,又要实现 _write() 方法;
  • Duplex 构造器 options 参数中有一个 allowHalfOpen 属性,默认是 true,如果设置成 false,那么 Duplex流会在 Readable 端已经关闭的时候,自动把 Writable 端也关闭;
  • 如果我们想让 Duplex流在其中一侧以对象模式运作,而在另一侧以二进制模式运作,那么可以给 readableObjectModewritableObjectMode 分别设置不同的值;

比如 network socket 就是一个典型的双工流,它既可以监听对端发来的数据,也可以往对端写入数据。

// server.js
import { createServer } from 'net';

const server = createServer();
server.on('connection', (socket) => {
// 写入数据
socket.write('Hello!\n');

// 读取数据
socket.on('data', (data) => {
console.log('receive msg: ', data.toString());
});
}).on('close', () => {
console.log('Server closed');
}).listen(3033, () => console.log('Server listening on port 3033');


// client.js
import { createConnection } from 'net';

const client = createConnection({
port: 3033,
host: 'localhost'
});
client.on('data', (data) => {
console.log(data.toString());

client.end('你好~\n');
});

Transform 流(传输流)

Transform 流是一种特殊的 Deplex流,专门用来转换数据。比如 zlib.createGzip()crypto.createCipheriv() 所创建的就是 Transform 流,它们分别用来压缩和加密数据(把原始数据流转成压缩/加密后的数据流)。

对于 Deplex 流,它并不关注读取的数据与写入其中的数据之间有没有联系,比如 TCP Socket(一种 Deplex 流),它 ·只知道自己可以给远端发送数据,并从远端接收数据就行了,至于发出去的数据与收到的数据之间是什么关系,并不需要由它来操心。下图是 Deplex流 和 Transform 流的区别。

Deplex 流与 Transform 流的区别

我们在定制新的 Transform 流时,需要实现的并不是 _read()_write(),而是另一组方法:_transform()_flush()

两个方法的签名:

_transform(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null, data?: any) => void);
_flush(callback: (error?: Error | null, data?: any) => void;);

_transform() 方法签名与 Writable流中的 _write() 方法一样,但是它并不把数据写入到底层资源,而是通过 this.push() 将其推送到内部缓冲区以供读取,这一点跟 Readable 流的 _read() 方法很像。

_flush() 方法是系统会在整条数据流即将结束的时候,触发这个方法,让我们有机会把还没有推送完的数据一次推送过去,以彻底结束这个流的工作。它接收一个回调参数,我们必须在所有操作都执行完毕后,触发这个回调,让这条数据流终止。

比如下面代码定制了一种 Transform 流,让它把数据里面的字母全部变成大写字母。

import { Transform, TransformOptions } from 'node:stream';
import { TransformCallback } from 'stream';

export class UpperStream extends Transform {

totalCharacters = 0;
constructor(options?: TransformOptions) {
super(options);
}

// 外部调用 .write() 时会触发 _transform()方法
_transform(chunk: string, _encoding: BufferEncoding, callback: TransformCallback): void {
// 转换接收到的数据
const upperStr = chunk.toString().toUpperCase();
this.totalCharacters += upperStr.length;
// 把转换后的数据推入内部缓冲区
this.push(upperStr);
callback();
}

// 外部调用 end()时会触发 _flush()方法
_flush(callback: TransformCallback): void {
// this.push(`(${this.totalCharacters} characters in total)`);
callback(null, this.totalCharacters);
}
}

// 使用
const upperStream = new UpperStream();
upperStream.on('data', chunk => {
console.log("🚀 ~ file: replace-stream.ts:25 ~ chunk:", chunk.toString());
});

// 写入数据
upperStream.write('Hello W');
upperStream.write('orld!');
upperStream.end();

PassThrough 流

这是一种特殊的 Transform 流,它不会对输出的数据做任何转换。

如果你想知道有多少数据经过了某一个或者某几个流对象,那么就可以使用 PassThrough 流轻松实现:

import { createGzip } from 'node:zlib';
import { PassThrough } from 'node:stream';
import { resolve } from 'node:path';
import { createReadStream, createWriteStream } from 'node:fs';

let bytesWritten = 0;

const monitor = new PassThrough();
monitor.on('data', (chunk) => {
bytesWritten += chunk.length;
});

monitor.on('finish', () => {
console.log(`${bytesWritten} bytes written.`);
});

const filename = resolve(__dirname, './req.http');

createReadStream(filename)
.pipe(createGzip())
.pipe(monitor)
.pipe(createWriteStream(filename));

如果不使用 PassThrough 流,而是自己定制一种 Transform 流,这种方案下,就必须把收到的数据块照原样尽快推送过去,既不能修改也不能拖延,而使用 PassThrough 流,这一点会由系统自行保证。

pipe 与 pipeline

pipe

把两个流通过 pipe() 方法连接在一起,会形成*吸附(suction)*效应,使得数据能从 readable 流自动进入 writable 流,于是我们就没必要再调用 read() 或 write() 了,而且也不用再担心 backpressure 问题了,因为这会由系统自动处理。

readable.pipe(writable, [options]);

pipe() 方法会把它收到的第一个参数返回给调用方,如果这个 writable 流本身,同时还是一个 Readable 流(比如 Deplex流或者 Transform流),那么调用者就可以继续在这个流上面调用 pipe(),从而形成链条。

pipe() 方法拼接流对象的时候,前一个流所产生的 error 事件,不会自动播报给下一个流,比如下面代码:

stream1
.pipe(stream2)
.on('error', () => {})

上面这种写法只能捕获 stream2 对象所发生的错误,如果我们还想把 stream1 错发生的错误也捕获下来,那必须再写一个监听器,让它直接关注那个对象。

stream1
.on('error', () => {})
.pipe(stream2)
.on('error', () => {})

这种写法显然不够优雅,如果管道里的环节比较多,效果更加糟糕。更严重的是,当某个流对象发生错误的时候,系统会把这个对象从管道里面拿走,我们没有正确的销毁相关的流对象,那么就可能让某些资源出于游离状态(dangling,也叫做悬挂状态),例如可能导致文件描述符或网络连接未能及时关闭,从而发生内存泄露。下面的写法可以让刚才的代码变的健壮一些:

function handleError(err) {
stream1.destroy();
stream2.destroy();
}

stream1
.on('error', handleError)
.pipe(stream2)
.on('error', handleError)

pipeline

pipeline可以让我们构建出更安全的管道,并且能够用更简单的方式处理其中的错误。

import { pipeline } from 'node:stream';
pipeline(stream1, stream2, stream3, ..., (err: Error) => {});

看上面签名就可以知道,可以利用 promisify 函数把 pipeline 转化成 Promise 版本(stream 模块里也直接提供了 promises.pipeline API)。pipeline 函数会在出错之后执行清理工作,并销毁管道之中的所有 stream 对象。

const fs = require('node:fs');
const zlib = require('node:zlib');
const { pipeline } = require('node:stream/promises');

pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
).then(() => {
// TODO...
}).catch(() => {
// TODO...
}).finally(() => {
// TODO...
});

如果某个功能无法单凭某一条 stream 实现,那么可以考虑把多条 stream 连接起来,让它们共同实现这项功能,这很好的体现了“每个模块只做一件事”(one thing per module know)的理念。

EventSource

一个 EventSource 实例会对 HTTP 服务器开启一个持久化的连接,以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close() 关闭。

EventSource 有固定的格式,下面代码是一个简单的 Node.js EventSource 后端代码:

import { URL } from 'node:url';
import { basename, resolve } from 'node:path';
import { createReadStream, statSync } from 'node:fs';
import { spawn, ChildProcessWithoutNullStreams } from 'node:child_process';
import { createServer, ServerResponse, IncomingMessage } from 'node:http';

const PORT = 8848;
const staticPath = resolve(__dirname, 'static');

function handleCpsErrors(cps: ChildProcessWithoutNullStreams, req: IncomingMessage, res: ServerResponse) {
cps.on('error', (error) => {
console.error('Ping command error:', error);
res.statusCode = 500;
res.end('Internal Server Error');
});
req.on('close', () => {
cps.kill(); // 客户端连接关闭时,杀死 ping 命令子进程
});
};

const http = createServer((req, res) => {
// 取出查询参数
const url = new URL(req.url, `http://localhost:${PORT}`);
const query = url.searchParams;
const method = (req.method || 'GET').toUpperCase();
const { pathname } = url;

if (pathname === '/ping' && method === 'GET') {
const pingHost = query.get('host');
const pingCount = query.get('count') || '4';
const pingSpawn = spawn('ping', [pingHost, '-c', pingCount]);

handleCpsErrors(pingSpawn, req, res);
// 设置响应标头
res.setHeader('Content-Type', 'text/event-stream');

res.statusCode = 200;

pingSpawn.stdout.setEncoding('utf-8');
res.write('event: message\n');
pingSpawn.stdout
.on('data', (data: string) => {
const dataBlocks = data.split('\n');
dataBlocks.forEach((block) => {
res.write(`data: ${block}\n\n`);
});
}).on('end', () => {
res.end(`data: [DONE]\n\n`);
});
}
});

http.listen(PORT, () => {
console.log(`Server listening on http://localhost:${PORT}`);
});

EventSource 有固定的消息格式:event: message\ndata: xxx\n\n,前者代表在前端监听消息的时间,这里使用 message 作为事件名称;而后者就代表发送的数据,需要注意是,每条数据需要使用 \n\n 换行符分割(约定的格式)。在前端我们就可以请求 /ping,监听 message 事件获取 data 了!

const source = new EventSource(`/ping?host=${host}&count=6`);

// 如果后端不是使用的 `event: message`,
// 而是使用了 `event: msg`,你需要使用
// `source.addEventListener('msg', (event) => { ... })` 来监听事件
source.onmessage = (event) => {
if (event.data === '[DONE]') {
return source.close();
}
element.textContent += `${event.data}\n`;
}