Streams API

Using streams, we can receive resources from the network or other sources and process them when the first one arrives

Using streams, we can receive resources from the network or other sources and process them as soon as the first one arrives.

There is no need to wait for the resource to be completely downloaded before use, we can use it immediately.

What is flow

The first example I think of is loading a YouTube video-you don't have to fully load it to start watching it.

Or streaming in real time, without even knowing when the content will end.

The content doesn't even have to end. It can be generated indefinitely.

Streams API

The Streams API allows us to process such content.

We have 2 different streaming modes: reading from the stream and writing to the stream.

With the exception of Internet Explorer, all modern browsers provide readable streams.

Writable streams are not available on Firefox and Internet Explorer.

As always, checkcaniuse.comThe latest information on this issue.

Let's start with the readable stream

Readable stream

Regarding readable streams, we have 3 types of objects:

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

We can use the ReadableStream object to use the stream.

This is the first example of a readable stream. The Fetch API allows to obtain resources from the network and use them as streams:

const stream = fetch('/resource')
  .then(response => response.body)

ThisbodyThe properties of the extracted response areReadableStreamObject instance. This is our readable stream.

reader

callgetReader()in aReadableStreamObject returns aReadableStreamDefaultReaderObject, reader. We can get it like this:

const reader = fetch('/resource').then(response => response.body.getReader())

We read data in chunks, where chunks are byte or type arrays. Putting chunks into the stream, we read one chunk at a time.

A single stream can contain different kinds of blocks.

Once we have aReadableStreamDefaultReaderObject, we can useread()method.

After the reader is created, the stream is locked, and other readers cannot get any blocks from it until we callreleaseLock()On top.

You can prepare a stream to achieve this effect, which will be explained more later.

Read data from the readable stream

Once we have aReadableStreamDefaultReaderObject instance from which we can read data.

This allows you to read the first block of the HTML content stream byte by byte from the flaviocopes.com web page (for CORS reasons, you can do this in the DevTools window opened on the web page).

fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(value)
      })
  })

Uint8Array

If you open each group of array items, you will enter a single item. These are bytes, stored inUint8Array:

bytes stored in Uint8Array

you can use itEncoding API:

const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(decoder.decode(value))
      })
  })

This will print out the characters loaded in the page:

Printed characters

This new version of the code will load each block of the stream and print it out:

(async () => {
  const fetchedResource = await fetch('https://flaviocopes.com/')
  const reader = await fetchedResource.body.getReader()

let charsReceived = 0 let result = ‘’

reader.read().then(function processText({ done, value }) { if (done) { console.log(‘Stream finished. Content received:’) console.log(result) return }

<span style="color:#a6e22e">console</span>.<span style="color:#a6e22e">log</span>(<span style="color:#e6db74">`Received </span><span style="color:#e6db74">${</span><span style="color:#a6e22e">result</span>.<span style="color:#a6e22e">length</span><span style="color:#e6db74">}</span><span style="color:#e6db74"> chars so far!`</span>)

<span style="color:#a6e22e">result</span> <span style="color:#f92672">+=</span> <span style="color:#a6e22e">value</span>

<span style="color:#66d9ef">return</span> <span style="color:#a6e22e">reader</span>.<span style="color:#a6e22e">read</span>().<span style="color:#a6e22e">then</span>(<span style="color:#a6e22e">processText</span>)

}) })()

I wrap it in aasyncFunctions to be called immediatelyawait.

The processText() function we created receives an object with 2 properties.

  • doneIf the stream ends and we have all the data, return true
  • valueThe value of the currently received block

We create this recursive function to process the entire stream.

Create flow

Warning: Edge and Internet Explorer are not supported

We just saw how to use the readable stream generated by the Fetch API. This is a great way to use streams because the use cases are very practical.

Now let's look at how to create a readable stream so that we can use our code to access resources.

We have usedReadableStreamThe previous object. Now let's usenewKey words:

const stream = new ReadableStream()

Now, this stream is not very useful. It is an empty stream, and if someone wants to read it, there is no data.

We can define the behavior of the stream by passing an object during initialization. The object can define those properties:

  • startThe function called when creating a readable stream. From here, you can connect to the data source and perform management tasks.
  • pullA function called repeatedly to obtain data without reaching the high water mark of the internal queue
  • cancelThe function that is called when the stream is cancelled, for example whencancel()Invoke the method on the receiving end

This is a simple example of the object structure:

const stream = new ReadableStream({
  start(controller) {

}, pull(controller) {

}, cancel(reason) {

} })

start()withpull()Get the controller object, yesReadableStreamDefaultControllerObject, which allows you to control the flow state and internal queues.

To add data to the stream, we callcontroller.enqueue()Pass the variable that holds our data:

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello')
  }
})

When we are ready to close the stream, we callcontroller.close().

cancel()Get areasonThis is provided toReadableStream.cancel()The method call when the stream is canceled.

We can also pass an optional second object, which determines the queuing strategy. It contains 2 attributes:

  • highWaterMarkThe total number of blocks that can be stored in the internal queue. We mentioned this when we were talkingpull()before
  • size, The method that can be used to change the block size, in bytes

    {
    highWaterMark,
    size()
    }
    

These are mainly used to control logistics pressure, especially inTube chain, Which is still in the experimental stage in Web API.

when. . . whenhighWaterMarkReaches the value of the stream,Back pressureSignals are sent to previous streams in the pipeline to tell them to slow down the data pressure.

We have 2 built-in objects to define the queuing strategy:

  • ByteLengthQueuingStrategyWait until the cumulative size of the block (in bytes) exceeds the specified high water mark
  • CountQueuingStrategyUntil the cumulative number of blocks exceeds the specified high water mark

Example of setting the 32-byte high water mark:

new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }

Example of setting 1 block of high water mark:

new CountQueuingStrategy({ highWaterMark: 1 })

I said this is to tell youwere ableControl the amount of data flowing into the stream and communicate with other participants, but since things quickly become complicated, we will not repeat them.

Serve stream

I mentioned earlier that when we started to read the stream, the stream was locked and other readers could not access the stream until we calledreleaseLock()On top.

However, we can usetee()The method of the stream itself:

const stream = //...
const tees = stream.tee()

teesNow an array with 2 new streams, you can use these streams to read fromtees[0]withtees[1].

Writable stream

Regarding writable streams, we have 3 types of objects:

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

We can use the WritableStream object to create a stream for later use.

This is how we create a new writable stream:

const stream = new WritableStream()

We must pass an object to be useful. The object will have the following optional method implementations:

  • start()Called when the object is initialized
  • write()Called when a large block is ready to be written to the receiver (the infrastructure that holds the stream data before writing the data)
  • close()Called when we have finished writing the block
  • abort()Called when we want to signal an error

This is a skeleton:

const stream = new WritableStream({
  start(controller) {

}, write(chunk, controller) {

}, close(controller) {

}, abort(reason) {

} })

start(),close()withwrite()Through the controller,WritableStreamDefaultControllerObject instance.

As forReadableStream(), We can pass the second object tonew WritableStream()Set the queuing strategy.

For example, let's create a stream that gives a string stored in memory and create a stream that consumers can connect to.

We first define a decoder that will convert the received bytes into characters using the following codeEncoding API TextDecoder()Constructor:

const decoder = new TextDecoder("utf-8")

We can initialize WritableStream to achieveclose()Method, when the message is completely received and the client code calls it, it will be printed to the console:

const writableStream = new WritableStream({
  write(chunk) {
    //...
  },
  close() {
    console.log(`The message is ${result}`)
  }
})

let's beginwrite()This is achieved by initializing the ArrayBuffer and adding it to the block. Then, we continue to use Encoding API's coder.decode() method to decode this block (one byte) into one character. Then we add this value toresultThe string we declare outside of this object:

let result

const writableStream = new WritableStream({ write(chunk) { const buffer = new ArrayBuffer(2) const view = new Uint16Array(buffer) view[0] = chunk const decoded = decoder.decode(view, { stream: true }) result += decoded }, close() { //… } })

The WritableStream object is now initialized.

Now, let's implement the client code that will use this stream.

We first getWritableStreamDefaultWriterFromwritableStreampurpose:

const writer = writableStream.getWriter()

Next, we define the message to be sent:

const message = 'Hello!'

Then we initialize the encoder ascodingThe characters we want to send to the stream:

const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })

At this point, the string has been encoded as a byte array. Now we useforEachLoop on this array to send each byte to the stream. Before each callwrite()The method of the stream writer, we checkreadyThis property returns a Promise, so we only write when the stream writer is ready:

encoded.forEach(chunk => {
  writer.ready.then(() => {
    return writer.write(chunk)
  })
})

The only thing we miss now is to close the writer.forEachIt is a synchronous loop, which means that we can only reach this point after each item has been written.

We still checkreadyProperties, and then we call the close() method:

writer.ready.then(() => {
  writer.close()
})

Download mine for freeJavaScript beginner's manual


More browser tutorials: