Streams API

使用流,我们可以从网络或其他来源接收资源,并在第一位到达时对其进行处理

使用流,我们可以从网络或其他来源接收资源,并在第一位到达时立即对其进行处理。

无需等待资源完全下载再使用,我们可以立即使用它。

什么是流

我想到的第一个示例是加载YouTube视频-您不必完全加载它就可以开始观看它。

或实时流式传输,甚至不知道内容何时结束。

内容甚至不必结束。它可以无限期生成。

Streams API

Streams API允许我们处理此类内容。

我们有2种不同的流传输模式:从流读取和写入流。

除Internet Explorer之外,所有现代浏览器均提供可读流。

可写流在Firefox和Internet Explorer上不可用。

与往常一样,检查caniuse.com有关此问题的最新信息。

让我们从可读流开始

可读流

关于可读流,我们有3类对象:

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

我们可以使用ReadableStream对象使用流。

这是可读流的第一个示例。 Fetch API允许从网络获取资源并将其作为流使用:

const stream = fetch('/resource')
  .then(response => response.body)

body提取响应的属性是ReadableStream对象实例。这是我们可读的流。

读者

呼唤getReader()在一个ReadableStream对象返回一个ReadableStreamDefaultReader对象,读者。我们可以这样获得:

const reader = fetch('/resource').then(response => response.body.getReader())

我们分块读取数据,其中块是字节或类型数组。将大块放入流中,我们一次读取一个大块。

单个流可以包含不同种类的块。

一旦我们有一个ReadableStreamDefaultReader对象,我们可以使用read()方法。

创建读取器后,流就被锁定,其他读取器无法从中获取任何块,直到我们调用releaseLock()在上面。

您可以准备一个流以实现此效果,稍后将对此进行更多说明。

从可读流中读取数据

一旦我们有一个ReadableStreamDefaultReader对象实例,我们可以从中读取数据。

这样可以从flaviocopes.com网页逐字节读取HTML内容流的第一块(出于CORS的原因,您可以在该网页上打开的DevTools窗口中执行此操作)。

fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(value)
      })
  })

Uint8Array

如果打开每组数组项,则将进入到单个项。这些是字节,存储在Uint8Array

bytes stored in Uint8Array

您可以使用编码API

const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(decoder.decode(value))
      })
  })

这将打印出页面中加载的字符:

Printed characters

此新版本的代码将加载流的每个块,并将其打印出来:

(async () => {
  const fetchedResource = await fetch('https://flaviocopes.com/')
  const reader = await fetchedResource.body.getReader()

let charsReceived = 0 let result = ‘’

reader.read().then(function processText({ done, value }) { if (done) { console.log(‘Stream finished. Content received:’) console.log(result) return }

<span style="color:#a6e22e">console</span>.<span style="color:#a6e22e">log</span>(<span style="color:#e6db74">`Received </span><span style="color:#e6db74">${</span><span style="color:#a6e22e">result</span>.<span style="color:#a6e22e">length</span><span style="color:#e6db74">}</span><span style="color:#e6db74"> chars so far!`</span>)

<span style="color:#a6e22e">result</span> <span style="color:#f92672">+=</span> <span style="color:#a6e22e">value</span>

<span style="color:#66d9ef">return</span> <span style="color:#a6e22e">reader</span>.<span style="color:#a6e22e">read</span>().<span style="color:#a6e22e">then</span>(<span style="color:#a6e22e">processText</span>)

}) })()

我把它包在一个async立即调用的功能await

我们创建的processText()函数接收一个具有2个属性的对象。

  • done如果流结束并且我们获得了所有数据,则返回true
  • value当前收到的块的值

我们创建此递归函数来处理整个流。

创建流

警告:Edge和Internet Explorer不支持

我们刚刚看到了如何使用Fetch API生成的可读流,这是使用流的一种很好的方式,因为用例很实际。

现在让我们看一下如何创建可读流,以便我们可以使用我们的代码来访问资源。

我们已经使用了ReadableStream之前的对象。现在让我们使用new关键词:

const stream = new ReadableStream()

现在,此流不是很有用。它是一个空流,如果有人要读取它,则没有数据。

我们可以通过在初始化期间传递一个对象来定义流的行为。该对象可以定义那些属性:

  • start创建可读流时调用的函数。在这里,您可以连接到数据源并执行管理任务。
  • pull重复调用以获取数据的函数,而未达到内部队列高水位线
  • cancel当取消流时调用的函数,例如当cancel()在接收端调用方法

这是对象结构的简单示例:

const stream = new ReadableStream({
  start(controller) {

}, pull(controller) {

}, cancel(reason) {

} })

start()pull()获取控制器对象,是ReadableStreamDefaultController对象,它使您可以控制流状态和内部队列。

要将数据添加到流中,我们称controller.enqueue()传递保存我们数据的变量:

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello')
  }
})

当我们准备关闭流时,我们调用controller.close()

cancel()得到一个reason这是提供给ReadableStream.cancel()取消流时的方法调用。

我们还可以传递一个可选的第二个对象,该对象确定排队策略。它包含2个属性:

  • highWaterMark内部队列中可以存储的块总数。我们在谈论时提到了这一点pull()
  • size,可用于更改块大小的方法,以字节为单位

    {
    highWaterMark,
    size()
    }
    

这些主要用于控制物流压力,尤其是在管链,这在Web API中仍处于试验阶段。

当。。。的时候highWaterMark达到流的值,背压信号被发送到管道中的先前流,以告诉它们减慢数据压力。

我们有2个内置对象来定义排队策略:

  • ByteLengthQueuingStrategy等到块的累积大小(以字节为单位)超过指定的高水位线
  • CountQueuingStrategy直到块的累积数量超过指定的高水位线为止

设置32字节高水位标记的示例:

new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }

设置1块高水位标记的示例:

new CountQueuingStrategy({ highWaterMark: 1 })

我说这是要告诉你能够控制流入流中的数据量,并与其他参与者进行通信,但是由于事情很快变得复杂,因此我们不再赘述。

发球流

之前我曾提到,当我们开始阅读流时,该流已被锁定,其他读者无法访问该流,直到我们调用releaseLock()在上面。

不过,我们可以使用tee()流本身的方法:

const stream = //...
const tees = stream.tee()

tees现在是一个包含2个新流的数组,您可以使用这些流从中读取tees[0]tees[1]

可写流

关于可写流,我们有3类对象:

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

我们可以使用WritableStream对象创建流,供以后使用。

这是我们创建新的可写流的方式:

const stream = new WritableStream()

我们必须传递一个对象才能有用。该对象将具有以下可选方法实现:

  • start()对象初始化时调用
  • write()当准备好将大块写入接收器时调用(在写入数据之前保存流数据的基础结构)
  • close()当我们完成编写块时调用
  • abort()当我们要发信号通知错误时调用

这是一个骨架:

const stream = new WritableStream({
  start(controller) {

}, write(chunk, controller) {

}, close(controller) {

}, abort(reason) {

} })

start()close()write()通过控制器,WritableStreamDefaultController对象实例。

至于ReadableStream(),我们可以将第二个对象传递给new WritableStream()设置排队策略。

例如,让我们创建一个流,该流给出了存储在内存中的字符串,并创建了一个消费者可以连接的流。

我们首先定义一个解码器,该解码器将使用以下代码将接收到的字节转换为字符编码API TextDecoder()构造函数:

const decoder = new TextDecoder("utf-8")

我们可以初始化WritableStream来实现close()方法,当完全接收到消息并且客户端代码调用它时,它将打印到控制台:

const writableStream = new WritableStream({
  write(chunk) {
    //...
  },
  close() {
    console.log(`The message is ${result}`)
  }
})

我们开始write()通过初始化ArrayBuffer并将其添加到块中来实现。然后,我们继续使用Encoding API的coder.decode()方法将此块(一个字节)解码为一个字符。然后,我们将此值添加到result我们在此对象之外声明的字符串:

let result

const writableStream = new WritableStream({ write(chunk) { const buffer = new ArrayBuffer(2) const view = new Uint16Array(buffer) view[0] = chunk const decoded = decoder.decode(view, { stream: true }) result += decoded }, close() { //… } })

WritableStream对象现在已初始化。

现在,我们去实现将使用此流的客户端代码。

我们首先得到WritableStreamDefaultWriter来自的对象writableStream目的:

const writer = writableStream.getWriter()

接下来,我们定义要发送的消息:

const message = 'Hello!'

然后我们将编码器初始化为编码我们要发送到流中的字符:

const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })

此时,字符串已编码为字节数组。现在,我们使用forEach在此数组上循环以将每个字节发送到流。每次致电之前write()流编写器的方法,我们检查ready该属性返回一个Promise,因此我们仅在流编写器准备就绪时进行写操作:

encoded.forEach(chunk => {
  writer.ready.then(() => {
    return writer.write(chunk)
  })
})

我们现在唯一想念的就是关闭作家。forEach是一个同步循环,这意味着我们只有在每一项都写完之后才能达到这一点。

我们仍然检查ready属性,然后我们调用close()方法:

writer.ready.then(() => {
  writer.close()
})

免费下载我的JavaScript初学者手册


更多浏览器教程: