學習流的作用以及為什麼它們如此重要以及如何使用它們。

什麼是流

流是 Node.js 應用程序的基本概念之一。它們是一種以高效方式處理讀取/寫入文件、網絡通信或任何類型的端到端信息交換的方法。

流不是 Node.js 獨有的概念。它們在數十年前引入了 Unix 操作系統,程序可以通過管道運算符(|)相互交互地傳遞流。

例如, 在傳統的方式中,當您要求程序讀取一個文件時,文件被從頭到尾讀入內存,然後進行處理。

使用流,您可以分塊讀取它,處理其內容,而無需將其全部保留在內存中。

Node.js 的 stream 模塊 提供了所有流 API 的基礎。

為什麼使用流

流主要提供了兩個優勢,與其他數據處理方法相比:

  • 內存效率:您不需要在能夠處理數據之前將大量數據加載到內存中。
  • 時間效率:只要有數據可用,開始處理數據所需的時間遠小於等到整個數據有效負載可用時的時間。

流的一個例子

一個典型的例子是從磁盤讀取文件。

使用 Node fs 模塊,當建立一個新的連接到您的 HTTP 服務器時,您可以讀取一個文件並在 HTTP 上提供它:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function (req, res) {
  fs.readFile(__dirname + '/data.txt', (err, data) => {
    res.end(data);
  });
});
server.listen(3000);

readFile() 會讀取文件的全部內容,並在完成時調用回調函數。

回調函數中的 res.end(data) 將文件內容返回給 HTTP 客戶端。

如果文件很大,此操作將需要相當長的時間。以下是使用流實現相同功能的代碼:

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream(__dirname + '/data.txt');
  stream.pipe(res);
});
server.listen(3000);

我們不再等待文件完全讀取,而是在有數據塊準備好發送給 HTTP 客戶端時立即開始將其串流傳送。

pipe()

上面的例子使用了 stream.pipe(res)pipe() 方法是在文件流上調用的。

這段代碼是做什麼的?它將源導入目標。

您在源流上調用它,因此在這種情況下,文件流被導入 HTTP 響應。

pipe() 方法的返回值是目標流,這是一個非常方便的功能,它使我們能夠鏈接多個 pipe() 調用,如下所示:

src.pipe(dest1).pipe(dest2);

此構造與以下操作相同:

src.pipe(dest1);
dest1.pipe(dest2);

基於流的 Node.js API

由於其優勢,許多 Node.js 核心模塊提供了本地流處理功能,其中最重要的包括:

  • process.stdin 返回連接到 stdin 的流
  • process.stdout 返回連接到 stdout 的流
  • process.stderr 返回連接到 stderr 的流
  • fs.createReadStream() 創建讀取文件的可讀流
  • fs.createWriteStream() 創建寫入文件的可寫流
  • net.connect() 創建基於流的連接
  • http.request() 返回 http.ClientRequest 類的實例,這是一個可寫流
  • zlib.createGzip() 將數據使用 gzip(壓縮算法)壓縮為流
  • zlib.createGunzip() 解壓縮 gzip 流。
  • zlib.createDeflate() 將數據使用 deflate(壓縮算法)壓縮為流
  • zlib.createInflate() 解壓縮 deflate 流

不同類型的流

有四個流類別:

  • Readable:可從中連接,但無法連接到其中(可以接收數據,但無法發送數據)。將數據推入可讀流時,它會被緩衝,直到有使用者開始讀取數據。
  • Writable:可以連接到其中,但無法從中連接(可以發送數據,但無法從中接收數據)。
  • Duplex:既可以連接到其中,也可以從中連接,基本上是可讀流和可寫流的組合。
  • Transform:與 Duplex 流類似,但其輸出是其輸入的轉換

如何創建可讀流

我們從 stream 模塊 中獲取可讀流,然後初始化它:

const Stream = require('stream');
const readableStream = new Stream.Readable();

現在,流已經初始化,我們可以將數據傳送給它:

readableStream.push('hi!');
readableStream.push('ho!');

如何創建可寫流

要創建可寫流,我們擴展基礎的 Writable 對象,並實現其 _write() 方法。

首先創建一個流對象:

const Stream = require('stream');
const writableStream = new Stream.Writable();

然後實現 _write 方法:

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString());
  next();
};

現在,您可以將可讀流引入可寫流:

process.stdin.pipe(writableStream);

如何從可讀流中獲取數據

如何從可讀流讀取數據?使用可寫流:

const Stream = require('stream');

const readableStream = new Stream.Readable();
const writableStream = new Stream.Writable();

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString());
  next();
};

readableStream.pipe(writableStream);

readableStream.push('hi!');
readableStream.push('ho!');

您還可以直接使用 readable 事件來消耗可讀流:

readableStream.on('readable', () => {
  console.log(readableStream.read());
});

如何將數據發送到可寫流

使用流的 write() 方法:

writableStream.write('hey!\n');

告訴可寫流您已經結束寫入

使用 end() 方法:

const Stream = require('stream');

const readableStream = new Stream.Readable();
const writableStream = new Stream.Writable();

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString());
  next();
};

readableStream.pipe(writableStream);

readableStream.push('hi!');
readableStream.push('ho!');

writableStream.end();