/

The Streams API: Processing Data as it Arrives

The Streams API: Processing Data as it Arrives

Using streams, we can receive a resource from the network or other sources and process it as soon as the first bit arrives. This allows us to immediately work with the resource instead of waiting for it to fully download.

What is a stream

A common example that demonstrates the benefit of streams is loading a YouTube video. You don’t have to wait for the entire video to load before you can start watching it. Streams also come into play with live streaming, where the content might not have a definitive end and could be generated indefinitely.

The Streams API

The Streams API provides us with the ability to work with streams. There are two types of streaming modes: reading from a stream and writing to a stream.

Readable streams are available in all modern browsers except for Internet Explorer, while writable streams are not available on Firefox and Internet Explorer. For the most up-to-date information on browser support, you can refer to caniuse.com.

Let’s start with readable streams.

Readable streams

There are three classes of objects related to readable streams:

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

We can consume streams using a ReadableStream object. To illustrate, the Fetch API allows us to retrieve a resource from the network and make it available as a stream:

1
2
const stream = fetch('/resource')
.then(response => response.body);

In this example, the body property of the response object is a ReadableStream instance, which represents our readable stream.

The next step is to create a ReadableStreamDefaultReader object by calling the getReader() method on the ReadableStream object:

1
2
const reader = fetch('/resource')
.then(response => response.body.getReader());

We read the data from the stream in chunks, where each chunk can be a byte or a typed array. The chunks are enqueued in the stream, and we can read them one at a time.

Once we have a ReadableStreamDefaultReader object, we can access the data using the read() method. It’s important to note that once a reader is created, the stream is locked, and no other reader can access it until we call releaseLock().

To read data from a readable stream, we can use the following code:

1
2
3
4
5
6
7
8
9
fetch('https://flaviocopes.com/')
.then(response => {
response.body
.getReader()
.read()
.then(({value, done}) => {
console.log(value);
});
});

This code reads the first chunk of the HTML content from the flaviocopes.com web page byte by byte. It displays the Uint8Array that contains the bytes of the chunk.

We can transform these bytes into characters using the Encoding API. Here’s an example of decoding the bytes and printing the resulting characters:

1
2
3
4
5
6
7
8
9
10
const decoder = new TextDecoder('utf-8');
fetch('https://flaviocopes.com/')
.then(response => {
response.body
.getReader()
.read()
.then(({value, done}) => {
console.log(decoder.decode(value));
});
});

Moving on, if we want to load and print every chunk of the stream, we can use the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(async () => {
const fetchedResource = await fetch('https://flaviocopes.com/');
const reader = await fetchedResource.body.getReader();

let charsReceived = 0;
let result = '';

reader.read().then(function processText({ done, value }) {
if (done) {
console.log('Stream finished. Content received:');
console.log(result);
return;
}

console.log(`Received ${result.length} chars so far!`);

result += value;

return reader.read().then(processText);
});
})();

This code fetches the resource, creates a ReadableStreamDefaultReader, and uses a recursive function to process the entire stream. It prints each chunk of data and finally displays the entire content when the stream ends.

Creating a stream

To create a stream, we can use the new keyword:

1
const stream = new ReadableStream();

By itself, this stream is empty and doesn’t contain any data. To define the behavior of the stream, we pass an object during initialization. This object includes the following properties:

  • start: A function called when the readable stream is created. It allows us to connect to the data source and perform administrative tasks.
  • pull: A function called repeatedly to get data as long as the internal queue’s high water mark is not reached.
  • cancel: A function called when the stream is cancelled.

Here’s an example of the object structure:

1
2
3
4
5
6
7
8
9
10
11
const stream = new ReadableStream({
start(controller) {
// Connect to the data source and perform administrative tasks.
},
pull(controller) {
// Get data from the source.
},
cancel(reason) {
// Handle stream cancellation.
}
});

To add data to the stream, we can call the controller.enqueue() method and pass in the variable that holds our data:

1
2
3
4
5
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
}
});

To close the stream, we can call controller.close().

We can also pass an optional second object that determines the queuing strategy. It contains properties such as highWaterMark, which specifies the maximum number of chunks that can be stored in the internal queue, and size, a method to change the chunk size in bytes.

1
2
3
4
5
6
{
highWaterMark,
size() {
// Change chunk size.
}
}

These properties are mainly used for controlling the pressure on the stream, especially in the context of a pipe chain (still experimental in the Web APIs). When the high water mark is reached, a backpressure signal is sent to previous streams in the pipe to slow down the data pressure.

There are two built-in objects that define queuing strategies:

  • ByteLengthQueuingStrategy, which waits until the accumulated size in bytes of the chunks exceeds the specified high water mark.
  • CountQueuingStrategy, which waits until the accumulated number of chunks exceeds the specified high water mark.

Teaming streams

When we start reading from a stream, it becomes locked, and other readers cannot access it until we call releaseLock() on it. However, we can duplicate the stream using the tee() method:

1
2
const stream = //...
const tees = stream.tee();

Now, tees is an array that contains two new streams. You can read from these new streams using tees[0] and tees[1].

Writable streams

For writable streams, there are three classes of objects:

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

We can create streams that we can consume later using a WritableStream object. Here’s how to create a new writable stream:

1
const stream = new WritableStream();

To be useful, we must pass an object that implements the following optional methods:

  • start(): Called when the object is initialized.
  • write(): Called when a chunk is ready to be written to the sink (the underlying structure holding the stream data).
  • close(): Called when we finished writing chunks.
  • abort(): Called when we want to signal an error.

Here’s an example skeleton:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const stream = new WritableStream({
start(controller) {
// Perform initialization tasks.
},
write(chunk, controller) {
// Write the chunk to the sink.
},
close(controller) {
// Handle closing of the stream.
},
abort(reason) {
// Handle stream aborting.
}
});

Similar to ReadableStream, we can pass a second object to new WritableStream() to set the queuing strategy.

To demonstrate how to create a writable stream, let’s say we want to create a stream that allows consumers to connect and receive a string stored in memory. We start by defining a decoder that will transform the bytes we receive into characters using the Encoding API TextDecoder() constructor:

1
const decoder = new TextDecoder('utf-8');

Next, we can initialize the writable stream and implement the close() method, which will print to the console when the message is fully received and the client code calls it:

1
2
3
4
5
6
7
8
const writableStream = new WritableStream({
write(chunk) {
// Process the chunk.
},
close() {
console.log(`The message is ${result}`);
}
});

The implemented write() method initializes an ArrayBuffer and adds the chunk to it. Then, it decodes the chunk into characters using decoder.decode() and adds the decoded characters to a result string:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
let result;

const writableStream = new WritableStream({
write(chunk) {
const buffer = new ArrayBuffer(2);
const view = new Uint16Array(buffer);
view[0] = chunk;
const decoded = decoder.decode(view, { stream: true });
result += decoded;
},
close() {
// Handle closing of the stream.
}
});

Now that the WritableStream object is initialized, let’s implement the client code that will use the stream. First, we need to get the WritableStreamDefaultWriter object from the writableStream object:

1
const writer = writableStream.getWriter();

We’ll define a message that needs to be sent:

1
const message = 'Hello!';

Then, we initialize the encoder to encode the characters we want to send to the stream:

1
2
const encoder = new TextEncoder();
const encoded = encoder.encode(message, { stream: true });

Since forEach is a synchronous loop, we check the ready property before each call to the write() method of the stream writer. This ensures that we only write when the stream writer is ready:

1
2
3
4
5
encoded.forEach(chunk => {
writer.ready.then(() => {
return writer.write(chunk);
});
});

Finally, we close the writer. Again, we check the ready property and then call the close() method:

1
2
3
writer.ready.then(() => {
writer.close();
});

This is a basic example demonstrating how to create and use writable streams.

tags: [“streams”, “API”, “data processing”]