使用流式API,我們可以在第一個位元抵達時,立即從網路或其他來源接收資源並處理它。
不必等待資源完全下載後再使用它,可以立即開始處理。
什麼是流?
首先遇到的例子是載入YouTube視頻 - 在您開始觀看它之前,您不必完全載入它。
或者是直播,您甚至不知道內容何時結束。
內容甚至不必結束,它可以無限生成。
Streams API(流API)
Streams API允許我們處理這種類型的內容。
我們有兩種不同的流模式:從流程讀取和寫入流程。
可讀流在除了Internet Explorer之外的所有現代瀏覽器中都可用。
可寫流在Firefox和Internet Explorer中不可用。
一如既往,請參閱caniuse.com上有關此問題的最新信息。
讓我們從可讀流開始
可讀流
當涉及到可讀流時,我們有三種對象類別:
- ReadableStream
- ReadableStreamDefaultReader
- ReadableStreamDefaultController
我們可以使用ReadableStream對像來消耗流。
這是第一個可讀流的例子。Fetch API允許從網絡中獲取資源並將其作為流程提供。
const stream = fetch('/resource')
.then(response => response.body)
fetch響應的body屬性是ReadableStream對象實例。這就是我們的可讀流。
閱讀器
在ReadableStream對像上調用getReader()方法會返回ReadableStreamDefaultReader對像,即閱讀器。我們可以按如下方式獲取:
const reader = fetch('/resource').then(response => response.body.getReader())
我們按塊讀取數據,其中一個塊是一個位元組或一個類型化數組。將塊排入流程,並逐個塊讀取它們。
單個流程可以包含不同類型的块。
一旦我們有了ReadableStreamDefaultReader對象,我們可以使用read()方法訪問數據。
只要創建了閱讀器,流程就鎖定了,其他讀者不能從它獲取塊,直到我們在閱讀器上調用releaseLock()。
您可以使用tee()方法來實現這種效果,後面將更詳細介紹。
從可讀流讀取數據
一旦我們有了ReadableStreamDefaultReader對象實例,就可以從中讀取數據。
以下是如何按字節(由於CORS原因,您可以在打開了該頁面的DevTools窗口中執行此操作)逐個字節讀取flaviocopes.com網頁的首個块的數據的方法。
fetch('https://flaviocopes.com/')
.then(response => {
response.body
.getReader()
.read()
.then(({value, done}) => {
console.log(value)
})
})
如果你打開每組數組項,你會得到單個項目。這些都是以Uint8Array存儲的字元:
您可以使用Encoding API將這些字節轉換為字符:
const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
.then(response => {
response.body
.getReader()
.read()
.then(({value, done}) => {
console.log(decoder.decode(value))
})
})
這個版本的代碼會加載流的每個塊並將其打印出來:
(async () => {
const fetchedResource = await fetch('https://flaviocopes.com/')
const reader = await fetchedResource.body.getReader()
let charsReceived = 0
let result = ''
reader.read().then(function processText({ done, value }) {
if (done) {
console.log('Stream finished. Content received:')
console.log(result)
return
}
console.log(`Received ${result.length} chars so far!`)
result += value
return reader.read().then(processText)
})
})()
我將其封裝在了一個立即調用的async函數中以使用await。
我們創建的processText()函數接收一個帶有2個屬性的對象。
- 如果流結束並且我們獲得了所有數據,則done為true
- value是當前接收的塊的值
我們創建了這個遞歸函數來處理整個流。
創建stream(創建流)
警告:Edge和Internet Explorer不支持
我們剛才看到了如何使用Fetch API生成的可讀流,這是使用流來開始工作的一種很好的方式,因為場景是實際的。
現在讓我們看看如何創建一個可讀流,以便我們可以用我們的代碼訪問資源。
我們之前已經使用了ReadableStream對像。現在讓我們使用new關鍵字創建一個全新的ReadableStream:
const stream = new ReadableStream()
這個流現在沒有什麼用處。它是一個空的流,如果有人想要讀取它,那就沒有數據。
我們可以在初始化期間通過傳遞一個對像來定義流的行為方式。這個對象可以定義以下屬性:
- start:當創建可讀流時調用的函數。在這裡,您可以連接到資料源並執行管理任務。
- pull:在內部隊列高水位標達到之前,重複調用的函數以獲取數據。
- cancel:當流被取消時調用的函數,例如在接收端上調用cancel()方法時。
以下是對象結構的基本示例:
const stream = new ReadableStream({
start(controller) {
},
pull(controller) {
},
cancel(reason) {
}
})
start()和pull()獲得了控制器對像,它是ReadableStreamDefaultController對像的實例,它使您可以控制流狀態和內部隊列。
要將數據添加到流中,我們調用controller.enqueue(),並傳遞保存我們的數據的變量:
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Hello')
}
})
當我們準備關閉流時,我們調用controller.close()。
cancel()得到一個字符串,當取消流時提供給ReadableStream.cancel()方法調用。
我們還可以傳遞一個可選的第二個對象,該對象確定排隊策略。它包含2個屬性:
- highWaterMark:可以存儲在內部隊列中的塊的總數量。在之前講到pull()時,我們提到過這一點。
- size:一種方法,您可以使用它來更改塊大小,以字節表示。
以下是它的結構:
{
highWaterMark,
size()
}
這些主要用於控制流上的壓力,尤其是在Web API的pipe鏈的上下文中,這還是實驗階段。
當流的highWaterMark值達到時,會將壓力信號發送到管道中的前一個流,以告訴它們減緩數據壓力。
我們有兩個內建對象定義了排隊策略:
- ByteLengthQueuingStrategy:等待塊的累積大小(以字節為單位)超過指定的高水位標。
- CountQueuingStrategy:等待塊的累積數量超過指定的高水位標。
設置32字節的高水位標的示例:
new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }
設置1塊高水位標的示例:
new CountQueuingStrategy({ highWaterMark: 1 })
我提到這一點是告訴您,您可以控制流中流動的數據量,並與其他參與方通信,但我們不會深入研究更多細節,因為事情變得相當複雜。
拆分流
我之前提到,一旦我們開始讀取流,它就被鎖定了,其他讀者無法訪問它,直到我們在其上調用releaseLock()。
但是,我們可以使用流程本身的tee()方法複製流:
const stream = //...
const tees = stream.tee()
tees現在是一個包含兩個新流程的數組,你可以使用tees[0]和tees[1]來從中讀取。
可寫流
當涉及到可寫流時,我們有三種對象類別:
- WritableStream
- WritableStreamDefaultReader
- WritableStreamDefaultController
我們可以創建我們稍後可以使用的流,使用WritableStream對像。
以下是如何創建一個新的可寫流:
const stream = new WritableStream()
為了有用,我們必須傳遞一個對像。該對象將具有以下可選方法的實現:
- start():初始化對像時調用
- write():當塊準備好寫入接收方時調用(流被寫入之前的底層結構)
- close():當我們完成寫入塊時調用
- abort():當我們想要發送錯誤時調用
以下是藍圖:
const stream = new WritableStream({
start(controller) {
},
write(chunk, controller) {
},
close(controller) {
},
abort(reason) {
}
})
start()、close()和write()會被傳遞控制器,即WritableStreamDefaultController對像實例。
與ReadableStream()一樣,我們可以向new WritableStream()傳遞第二個對象,其中設置了queueing策略。
例如,讓我們創建一個流,根據存儲在內存中的字符串創建一個可以連接到的流。
首先,我們定義一個解碼器,我們將使用Encoding API的TextDecoder()構造函數將我們接收到的字節轉換為字符:
const decoder = new TextDecoder("utf-8")
我們可以通過實現close()
方法來初始化WritableStream
,當消息完全接收並且客戶端代碼調用它時,它將打印到控制台:
const writableStream = new WritableStream({
write(chunk) {
//...
},
close() {
console.log(`The message is ${result}`)
}
})
我們通過初始化ArrayBuffer並將其添加到塊中開始write()
實現。然後,我們繼續使用解碼器將這個塊(一個字節)解碼為字符,使用Encoding API的解碼器解碼方法。然後,將此值添加到宣告在此對象之外的“result”字符串中:
let result
const writableStream = new WritableStream({
write(chunk) {
const buffer = new ArrayBuffer(2)
const view = new Uint16Array(buffer)
view[0] = chunk
const decoded = decoder.decode(view, { stream: true })
result += decoded
},
close() {
//...
}
})
現在,WritableStream對像已經初始化。
現在,我們即將實現將使用此流的客戶端代碼。
首先我們從writableStream對象中獲取WritableStreamDefaultWriter對象:
const writer = writableStream.getWriter()
接下來,我們定義要發送的消息:
const message = 'Hello!'
然後,我們初始化編碼器以將我們要發送到流中的字符進行編碼:
const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })
此時,字符串已經編碼為字節數組。現在,我們使用forEach
循環在此數組上發送每個字節到流中。在調用流寫入者的write()方法之前,我們檢查ready
屬性,該屬性返回Promise,所以只有在流寫入者準備好時我們才寫入:
encoded.forEach(chunk => {
writer.ready.then(() => {
return writer.write(chunk)
})
})
我們現在只剩下關閉writer。forEach
是同步循環,這意味著我們只有在每個項目都被寫入之後才會到達這一點。
我們仍然檢查ready
屬性,然後調用close()方法:
writer.ready.then(() => {
writer.close()
})