Streams API

使用流,我們可以從網絡或其他來源接收資源,並在第一位到達時對其進行處理

使用流,我們可以從網絡或其他來源接收資源,並在第一位到達時立即對其進行處理。

無需等待資源完全下載再使用,我們可以立即使用它。

什麼是流

我想到的第一個示例是加載YouTube視頻-您不必完全加載它就可以開始觀看它。

或實時流式傳輸,甚至不知道內容何時結束。

內容甚至不必結束。它可以無限期生成。

Streams API

Streams API允許我們處理此類內容。

我們有2種不同的流模式:從流讀取和向流寫入。

除Internet Explorer之外,所有現代瀏覽器均提供可讀流。

可寫流在Firefox和Internet Explorer上不可用。

與往常一樣,檢查caniuse.com有關此問題的最新信息。

讓我們從可讀流開始

可讀流

關於可讀流,我們有3類對象:

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

我們可以使用ReadableStream對象使用流。

這是可讀流的第一個示例。 Fetch API允許從網絡獲取資源並將其作為流使用:

const stream = fetch('/resource')
  .then(response => response.body)

body提取響應的屬性是ReadableStream對象實例。這是我們可讀的流。

讀者

呼喚getReader()在一個ReadableStream對象返回一個ReadableStreamDefaultReader對象,讀者。我們可以這樣獲得:

const reader = fetch('/resource').then(response => response.body.getReader())

我們分塊讀取數據,其中塊是字節或類型數組。將大塊放入流中,我們一次讀取一個大塊。

單個流可以包含不同種類的塊。

一旦我們有一個ReadableStreamDefaultReader對象,我們可以使用read()方法。

創建閱讀器後,流就被鎖定,其他閱讀器無法從中獲取數據塊,直到我們調用releaseLock()在上面。

您可以準備一個流以實現此效果,稍後將對此進行更多說明。

從可讀流中讀取數據

一旦我們有一個ReadableStreamDefaultReader對象實例,我們可以從中讀取數據。

這樣可以從flaviocopes.com網頁逐字節讀取HTML內容流的第一塊(出於CORS的原因,您可以在該網頁上打開的DevTools窗口中執行此操作)。

fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(value)
      })
  })

Uint8Array

如果打開每組數組項,則將進入到單個項。這些是字節,存儲在Uint8Array

bytes stored in Uint8Array

您可以使用編碼API

const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(decoder.decode(value))
      })
  })

它將打印出頁面中加載的字符:

Printed characters

此新版本的代碼將加載流的每個塊,並將其打印出來:

(async () => {
  const fetchedResource = await fetch('https://flaviocopes.com/')
  const reader = await fetchedResource.body.getReader()

let charsReceived = 0 let result = ‘’

reader.read().then(function processText({ done, value }) { if (done) { console.log(‘Stream finished. Content received:’) console.log(result) return }

<span style="color:#a6e22e">console</span>.<span style="color:#a6e22e">log</span>(<span style="color:#e6db74">`Received </span><span style="color:#e6db74">${</span><span style="color:#a6e22e">result</span>.<span style="color:#a6e22e">length</span><span style="color:#e6db74">}</span><span style="color:#e6db74"> chars so far!`</span>)

<span style="color:#a6e22e">result</span> <span style="color:#f92672">+=</span> <span style="color:#a6e22e">value</span>

<span style="color:#66d9ef">return</span> <span style="color:#a6e22e">reader</span>.<span style="color:#a6e22e">read</span>().<span style="color:#a6e22e">then</span>(<span style="color:#a6e22e">processText</span>)

}) })()

我把它包在一個async立即調用的功能await

我們創建的processText()函數接收一個具有2個屬性的對象。

  • done如果流結束並且我們獲得了所有數據,則返回true
  • value當前收到的塊的值

我們創建此遞歸函數來處理整個流。

創建流

警告:Edge和Internet Explorer不支持

我們剛剛看到瞭如何使用Fetch API生成的可讀流,這是使用流的一種很好的方式,因為用例很實際。

現在讓我們看一下如何創建可讀流,以便我們可以使用我們的代碼來訪問資源。

我們已經使用了ReadableStream之前的對象。現在,我們使用new關鍵詞:

const stream = new ReadableStream()

現在,此流不是很有用。它是一個空流,如果有人要讀取它,則沒有數據。

我們可以通過在初始化期間傳遞一個對象來定義流的行為。該對象可以定義那些屬性:

  • start創建可讀流時調用的函數。在這裡,您可以連接到數據源並執行管理任務。
  • pull重複調用以獲取數據的函數,而未達到內部隊列高水位線
  • cancel當取消流時調用的函數,例如當cancel()在接收端調用方法

這是對象結構的簡單示例:

const stream = new ReadableStream({
  start(controller) {

}, pull(controller) {

}, cancel(reason) {

} })

start()pull()獲取控制器對象,是ReadableStreamDefaultController對象,它使您可以控制流狀態和內部隊列。

要將數據添加到流中,我們稱controller.enqueue()傳遞保存我們數據的變量:

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello')
  }
})

當我們準備關閉流時,我們調用controller.close()

cancel()得到一個reason這是提供給ReadableStream.cancel()取消流時的方法調用。

我們還可以傳遞一個可選的第二個對象,該對象確定排隊策略。它包含2個屬性:

  • highWaterMark內部隊列中可以存儲的塊總數。我們在談論時提到了這一點pull()
  • size,可用於更改塊大小的方法,以字節為單位

    {
    highWaterMark,
    size()
    }
    

這些主要用於控制物流壓力,尤其是在管鏈,這在Web API中仍處於試驗階段。

當。。。的時候highWaterMark達到流的值,一個背壓信號被發送到管道中的先前流,以告訴它們減慢數據壓力。

我們有2個內置對象來定義排隊策略:

  • ByteLengthQueuingStrategy等到塊的累積大小(以字節為單位)超過指定的高水位線
  • CountQueuingStrategy直到塊的累積數量超過指定的高水位線為止

設置32字節高水位標記的示例:

new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }

設置1塊高水位標記的示例:

new CountQueuingStrategy({ highWaterMark: 1 })

我提到這一點是為了告訴你能夠控制流入流中的數據量,並與其他參與者進行通信,但是由於事情很快變得複雜,因此我們不再贅述。

發球流

之前我曾提到,當我們開始閱讀流時,該流已被鎖定,其他讀者無法訪問該流,直到我們調用releaseLock()在上面。

不過,我們可以使用tee()流本身的方法:

const stream = //...
const tees = stream.tee()

tees現在是一個包含2個新流的數組,您可以使用這些流從中讀取tees[0]tees[1]

可寫流

關於可寫流,我們有3類對象:

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

我們可以使用WritableStream對象創建流,供以後使用。

這是我們創建新的可寫流的方式:

const stream = new WritableStream()

我們必須傳遞一個對象才能有用。該對象將具有以下可選方法實現:

  • start()對像初始化時調用
  • write()當準備好將大塊寫入接收器時調用(在寫入數據之前保存流數據的基礎結構)
  • close()當我們完成編寫塊時調用
  • abort()當我們要發信號通知錯誤時調用

這是一個骨架:

const stream = new WritableStream({
  start(controller) {

}, write(chunk, controller) {

}, close(controller) {

}, abort(reason) {

} })

start()close()write()通過控制器,WritableStreamDefaultController對象實例。

至於ReadableStream(),我們可以將第二個對像傳遞給new WritableStream()設置排隊策略。

例如,讓我們創建一個流,該流給出了存儲在內存中的字符串,並創建了一個消費者可以連接的流。

我們首先定義一個解碼器,該解碼器將使用以下代碼將接收到的字節轉換為字符編碼API TextDecoder()構造函數:

const decoder = new TextDecoder("utf-8")

我們可以初始化WritableStream來實現close()方法,當完全接收到消息並且客戶端代碼調用它時,該方法將打印到控制台:

const writableStream = new WritableStream({
  write(chunk) {
    //...
  },
  close() {
    console.log(`The message is ${result}`)
  }
})

我們開始write()通過初始化ArrayBuffer並將其添加到塊中來實現。然後,我們繼續使用Encoding API的coder.decode()方法將此塊(一個字節)解碼為一個字符。然後,我們將此值添加到result我們在此對象之外聲明的字符串:

let result

const writableStream = new WritableStream({ write(chunk) { const buffer = new ArrayBuffer(2) const view = new Uint16Array(buffer) view[0] = chunk const decoded = decoder.decode(view, { stream: true }) result += decoded }, close() { //… } })

WritableStream對象現在已初始化。

現在,我們去實現將使用此流的客戶端代碼。

我們首先得到WritableStreamDefaultWriter來自的對象writableStream目的:

const writer = writableStream.getWriter()

接下來,我們定義要發送的消息:

const message = 'Hello!'

然後我們將編碼器初始化為編碼我們要發送到流中的字符:

const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })

此時,字符串已被編碼為字節數組。現在,我們使用forEach在此數組上循環以將每個字節發送到流。每次致電之前write()流編寫器的方法,我們檢查ready該屬性返回一個Promise,因此我們僅在流編寫器準備就緒時進行寫操作:

encoded.forEach(chunk => {
  writer.ready.then(() => {
    return writer.write(chunk)
  })
})

我們現在唯一想念的就是關閉作家。forEach是一個同步循環,這意味著我們只有在每一項都寫完之後才能達到這一點。

我們仍然檢查ready屬性,然後我們調用close()方法:

writer.ready.then(() => {
  writer.close()
})

免費下載我的JavaScript初學者手冊


更多瀏覽器教程: