用 Stream 编程
缓冲模式和流模式
-
缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如
fs.writeFile、fs.readFile等; -
流模式(stream mode),在流模式下,系统会把自己从 资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。
假如我们要读取一份特别庞大的文件,这份文件有好几个GB大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几GB的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。
在 Node.js 中可以通过
buffer.constants.MAX_LENGTH查看某套开发环境最多可支持多少字节的缓冲区。
用缓冲模式的API把文件压缩成GZIP格式
import { gzip } from 'node:zlib';
import { promisify } from 'node:util';
import { promises as fs } from 'node:fs';
const gzipPromise = promisify(gzip);
const filename = process.argv[2];
async function main() {
const data = await fs.readFile(filename);
const gzippedData = await gzipPromise(data);
await fs.writeFile(`${filename}.gz`, gzippedData);
console.log('File successfully compressed');
}
main();
用流模式的API把文件压缩成GZIP格式
import { createGzip } from 'node:zlib';
import { createReadStream, createWriteStream } from 'node:fs';
const filename = process.argv[2];
createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'));
流对象结构
在 Node.js 平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream 核心模块提供的:
ReadableWritableDuplexTransform
每个 stream 类的对象,本身也都是一个 EventEmmiter 实例,所有流对象实际上可以触发许多事件,比如:
Readable流在读取完毕时会触发end事件;Writable流在写入完毕后会触发finish事件;- 如果操作过程中发生错误,则会触发
error事件;
流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:
- 二进制模式(Binary mode):以chunk的形式串流数据,这种模式可以用来处理缓冲或者字符串;
- 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如
Rxjs这个库);
Readable 流(可读流)
要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式(non-flowing),也叫暂停模式,另一种是流动模式(flowing)。
非流动模式
下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。
process.stdin.on('readable', () => {
let chunk: Buffer | null;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
// 回显
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
}
}).on('end', () => {
// Windows 上是 Ctrl+Z,Linux和Mac上是 Ctrl+D
console.log('End of stream');
});
readable 一旦发生(按下 Enter 键),就说明有新的数据可以读取了。
process.stdin.read() 方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。
流动模式
流动模式下,我们不通过 read() 方法提取数据,而是等着流对象把数据推送到 data 监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:
process.stdin.on('data', (chunk) => {
console.log('New data available');
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
})
.on('end', () => {
console.log('End of stream');
});