使用流式API,我們可以在第一個位元抵達時,立即從網路或其他來源接收資源並處理它。

不必等待資源完全下載後再使用它,可以立即開始處理。

什麼是流?

首先遇到的例子是載入YouTube視頻 - 在您開始觀看它之前,您不必完全載入它。

或者是直播,您甚至不知道內容何時結束。

內容甚至不必結束,它可以無限生成。

Streams API(流API)

Streams API允許我們處理這種類型的內容。

我們有兩種不同的流模式:從流程讀取和寫入流程。

可讀流在除了Internet Explorer之外的所有現代瀏覽器中都可用。

可寫流在Firefox和Internet Explorer中不可用。

一如既往,請參閱caniuse.com上有關此問題的最新信息。

讓我們從可讀流開始

可讀流

當涉及到可讀流時,我們有三種對象類別:

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

我們可以使用ReadableStream對像來消耗流。

這是第一個可讀流的例子。Fetch API允許從網絡中獲取資源並將其作為流程提供。

const stream = fetch('/resource')
 .then(response => response.body)

fetch響應的body屬性是ReadableStream對象實例。這就是我們的可讀流。

閱讀器

在ReadableStream對像上調用getReader()方法會返回ReadableStreamDefaultReader對像,即閱讀器。我們可以按如下方式獲取:

const reader = fetch('/resource').then(response => response.body.getReader())

我們按塊讀取數據,其中一個塊是一個位元組或一個類型化數組。將塊排入流程,並逐個塊讀取它們。

單個流程可以包含不同類型的块。

一旦我們有了ReadableStreamDefaultReader對象,我們可以使用read()方法訪問數據。

只要創建了閱讀器,流程就鎖定了,其他讀者不能從它獲取塊,直到我們在閱讀器上調用releaseLock()。

您可以使用tee()方法來實現這種效果,後面將更詳細介紹。

從可讀流讀取數據

一旦我們有了ReadableStreamDefaultReader對象實例,就可以從中讀取數據。

以下是如何按字節(由於CORS原因,您可以在打開了該頁面的DevTools窗口中執行此操作)逐個字節讀取flaviocopes.com網頁的首個块的數據的方法。

fetch('https://flaviocopes.com/')
 .then(response => {
 response.body
 .getReader()
 .read()
 .then(({value, done}) => {
 console.log(value)
 })
 })

Uint8Array

如果你打開每組數組項,你會得到單個項目。這些都是以Uint8Array存儲的字元:

bytes stored in Uint8Array

您可以使用Encoding API將這些字節轉換為字符:

const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
 .then(response => {
 response.body
 .getReader()
 .read()
 .then(({value, done}) => {
 console.log(decoder.decode(value))
 })
 })

這個版本的代碼會加載流的每個塊並將其打印出來:

(async () => {
 const fetchedResource = await fetch('https://flaviocopes.com/')
 const reader = await fetchedResource.body.getReader()

 let charsReceived = 0
 let result = ''

 reader.read().then(function processText({ done, value }) {
 if (done) {
 console.log('Stream finished. Content received:')
 console.log(result)
 return
 }

 console.log(`Received ${result.length} chars so far!`)

 result += value

 return reader.read().then(processText)
 })
})()

我將其封裝在了一個立即調用的async函數中以使用await。

我們創建的processText()函數接收一個帶有2個屬性的對象。

  • 如果流結束並且我們獲得了所有數據,則done為true
  • value是當前接收的塊的值

我們創建了這個遞歸函數來處理整個流。

創建stream(創建流)

警告:Edge和Internet Explorer不支持

我們剛才看到了如何使用Fetch API生成的可讀流,這是使用流來開始工作的一種很好的方式,因為場景是實際的。

現在讓我們看看如何創建一個可讀流,以便我們可以用我們的代碼訪問資源。

我們之前已經使用了ReadableStream對像。現在讓我們使用new關鍵字創建一個全新的ReadableStream:

const stream = new ReadableStream()

這個流現在沒有什麼用處。它是一個空的流,如果有人想要讀取它,那就沒有數據。

我們可以在初始化期間通過傳遞一個對像來定義流的行為方式。這個對象可以定義以下屬性:

  • start:當創建可讀流時調用的函數。在這裡,您可以連接到資料源並執行管理任務。
  • pull:在內部隊列高水位標達到之前,重複調用的函數以獲取數據。
  • cancel:當流被取消時調用的函數,例如在接收端上調用cancel()方法時。

以下是對象結構的基本示例:

const stream = new ReadableStream({
 start(controller) {

 },
 pull(controller) {

 },
 cancel(reason) {

 }
})

start()和pull()獲得了控制器對像,它是ReadableStreamDefaultController對像的實例,它使您可以控制流狀態和內部隊列。

要將數據添加到流中,我們調用controller.enqueue(),並傳遞保存我們的數據的變量:

const stream = new ReadableStream({
 start(controller) {
 controller.enqueue('Hello')
 }
})

當我們準備關閉流時,我們調用controller.close()。

cancel()得到一個字符串,當取消流時提供給ReadableStream.cancel()方法調用。

我們還可以傳遞一個可選的第二個對象,該對象確定排隊策略。它包含2個屬性:

  • highWaterMark:可以存儲在內部隊列中的塊的總數量。在之前講到pull()時,我們提到過這一點。
  • size:一種方法,您可以使用它來更改塊大小,以字節表示。

以下是它的結構:

{
 highWaterMark,
 size()
}

這些主要用於控制流上的壓力,尤其是在Web API的pipe鏈的上下文中,這還是實驗階段。

當流的highWaterMark值達到時,會將壓力信號發送到管道中的前一個流,以告訴它們減緩數據壓力。

我們有兩個內建對象定義了排隊策略:

  • ByteLengthQueuingStrategy:等待塊的累積大小(以字節為單位)超過指定的高水位標。
  • CountQueuingStrategy:等待塊的累積數量超過指定的高水位標。

設置32字節的高水位標的示例:

new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }

設置1塊高水位標的示例:

new CountQueuingStrategy({ highWaterMark: 1 })

我提到這一點是告訴您,您可以控制流中流動的數據量,並與其他參與方通信,但我們不會深入研究更多細節,因為事情變得相當複雜。

拆分流

我之前提到,一旦我們開始讀取流,它就被鎖定了,其他讀者無法訪問它,直到我們在其上調用releaseLock()。

但是,我們可以使用流程本身的tee()方法複製流:

const stream = //...
const tees = stream.tee()

tees現在是一個包含兩個新流程的數組,你可以使用tees[0]和tees[1]來從中讀取。

可寫流

當涉及到可寫流時,我們有三種對象類別:

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

我們可以創建我們稍後可以使用的流,使用WritableStream對像。

以下是如何創建一個新的可寫流:

const stream = new WritableStream()

為了有用,我們必須傳遞一個對像。該對象將具有以下可選方法的實現:

  • start():初始化對像時調用
  • write():當塊準備好寫入接收方時調用(流被寫入之前的底層結構)
  • close():當我們完成寫入塊時調用
  • abort():當我們想要發送錯誤時調用

以下是藍圖:

const stream = new WritableStream({
 start(controller) {

 },
 write(chunk, controller) {

 },
 close(controller) {

 },
 abort(reason) {

 }
})

start()、close()和write()會被傳遞控制器,即WritableStreamDefaultController對像實例。

與ReadableStream()一樣,我們可以向new WritableStream()傳遞第二個對象,其中設置了queueing策略。

例如,讓我們創建一個流,根據存儲在內存中的字符串創建一個可以連接到的流。

首先,我們定義一個解碼器,我們將使用Encoding API的TextDecoder()構造函數將我們接收到的字節轉換為字符:

const decoder = new TextDecoder("utf-8")

我們可以通過實現close()方法來初始化WritableStream,當消息完全接收並且客戶端代碼調用它時,它將打印到控制台:

const writableStream = new WritableStream({
 write(chunk) {
 //...
 },
 close() {
 console.log(`The message is ${result}`)
 }
})

我們通過初始化ArrayBuffer並將其添加到塊中開始write()實現。然後,我們繼續使用解碼器將這個塊(一個字節)解碼為字符,使用Encoding API的解碼器解碼方法。然後,將此值添加到宣告在此對象之外的“result”字符串中:

let result

const writableStream = new WritableStream({
 write(chunk) {
 const buffer = new ArrayBuffer(2)
 const view = new Uint16Array(buffer)
 view[0] = chunk
 const decoded = decoder.decode(view, { stream: true })
 result += decoded
 },
 close() {
 //...
 }
})

現在,WritableStream對像已經初始化。

現在,我們即將實現將使用此流的客戶端代碼。

首先我們從writableStream對象中獲取WritableStreamDefaultWriter對象:

const writer = writableStream.getWriter()

接下來,我們定義要發送的消息:

const message = 'Hello!'

然後,我們初始化編碼器以將我們要發送到流中的字符進行編碼

const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })

此時,字符串已經編碼為字節數組。現在,我們使用forEach循環在此數組上發送每個字節到流中。在調用流寫入者的write()方法之前,我們檢查ready屬性,該屬性返回Promise,所以只有在流寫入者準備好時我們才寫入:

encoded.forEach(chunk => {
 writer.ready.then(() => {
 return writer.write(chunk)
 })
})

我們現在只剩下關閉writer。forEach是同步循環,這意味著我們只有在每個項目都被寫入之後才會到達這一點。

我們仍然檢查ready屬性,然後調用close()方法:

writer.ready.then(() => {
 writer.close()
})