Streams API

ストリームを使用して、ネットワークまたは他のソースからリソースを受け取り、最初のビットが到着するとすぐにそれを処理できます

ストリームを使用して、ネットワークまたは他のソースからリソースを受け取り、最初のビットが到着するとすぐにそれを処理できます。

リソースが完全にダウンロードされるのを待ってから使用する代わりに、すぐに作業できます。

ストリームとは

頭に浮かぶ最初の例は、YouTubeビデオのロードです。視聴を開始する前に、完全にロードする必要はありません。

または、コンテンツがいつ終了するかさえわからないライブストリーミング。

コンテンツを終了する必要はありません。無期限に生成される可能性があります。

Streams API

Streams APIを使用すると、この種のコンテンツを操作できます。

ストリームからの読み取りとストリームへの書き込みの2つの異なるストリーミングモードがあります。

読み取り可能なストリームは、InternetExplorerを除くすべての最新のブラウザーで使用できます。

書き込み可能なストリームは、FirefoxおよびInternetExplorerでは使用できません。

いつものように、チェックcaniuse.comこの問題に関する最新情報については。

読み取り可能なストリームから始めましょう

読み取り可能なストリーム

読み取り可能なストリームに関しては、3つのクラスのオブジェクトがあります。

  • ReadableStream
  • ReadableStreamDefaultReader
  • ReadableStreamDefaultController

ReadableStreamオブジェクトを使用してストリームを消費できます。

これは、読み取り可能なストリームの最初の例です。 Fetch APIを使用すると、ネットワークからリソースを取得して、ストリームとして利用できるようになります。

const stream = fetch('/resource')
  .then(response => response.body)

ザ・bodyフェッチ応答のプロパティはReadableStreamオブジェクトインスタンス。これは私たちの読み取り可能なストリームです。

読者

呼び出しgetReader()ReadableStreamオブジェクトはReadableStreamDefaultReaderオブジェクト、リーダー。私たちはそれをこのように得ることができます:

const reader = fetch('/resource').then(response => response.body.getReader())

データをチャンクで読み取ります。チャンクはバイトまたは型付き配列です。チャンクはストリームにエンキューされ、一度に1つのチャンクを読み取ります。

1つのストリームにさまざまな種類のチャンクを含めることができます。

一度私たちはReadableStreamDefaultReaderオブジェクトを使用してデータにアクセスできますread()方法。

リーダーが作成されるとすぐに、ストリームはロックされ、他のリーダーは、呼び出すまで、ストリームからチャンクを取得できません。releaseLock()その上に。

この効果を達成するためにストリームをティーすることができます。これについては後で詳しく説明します。

読み取り可能なストリームからのデータの読み取り

一度私たちはReadableStreamDefaultReaderオブジェクトインスタンスからデータを読み取ることができます。

これは、flaviocopes.com WebページからHTMLコンテンツのストリームの最初のチャンクをバイトごとに読み取る方法です(CORSの理由から、そのWebページで開いたDevToolsウィンドウでこれを実行できます)。

fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(value)
      })
  })

Uint8Array

配列アイテムの各単一グループを開くと、単一アイテムが表示されます。それらはバイトであり、Uint8Array

bytes stored in Uint8Array

これらのバイトを使用して文字に変換できますエンコーディングAPI

const decoder = new TextDecoder('utf-8')
fetch('https://flaviocopes.com/')
  .then(response => {
    response.body
      .getReader()
      .read()
      .then(({value, done}) => {
        console.log(decoder.decode(value))
      })
  })

これにより、ページにロードされた文字が出力されます。

Printed characters

この新しいバージョンのコードは、ストリームのすべてのチャンクをロードし、それを出力します。

(async () => {
  const fetchedResource = await fetch('https://flaviocopes.com/')
  const reader = await fetchedResource.body.getReader()

let charsReceived = 0 let result = ‘’

reader.read().then(function processText({ done, value }) { if (done) { console.log(‘Stream finished. Content received:’) console.log(result) return }

<span style="color:#a6e22e">console</span>.<span style="color:#a6e22e">log</span>(<span style="color:#e6db74">`Received </span><span style="color:#e6db74">${</span><span style="color:#a6e22e">result</span>.<span style="color:#a6e22e">length</span><span style="color:#e6db74">}</span><span style="color:#e6db74"> chars so far!`</span>)

<span style="color:#a6e22e">result</span> <span style="color:#f92672">+=</span> <span style="color:#a6e22e">value</span>

<span style="color:#66d9ef">return</span> <span style="color:#a6e22e">reader</span>.<span style="color:#a6e22e">read</span>().<span style="color:#a6e22e">then</span>(<span style="color:#a6e22e">processText</span>)

}) })()

私はこれを包みましたasync使用する即時呼び出し関数await

作成するprocessText()関数は、2つのプロパティを持つオブジェクトを受け取ります。

  • doneストリームが終了し、すべてのデータを取得した場合はtrue
  • value受信した現在のチャンクの値

この再帰関数を作成して、ストリーム全体を処理します。

ストリームの作成

警告:EdgeおよびInternetExplorerではサポートされていません

Fetch APIによって生成された読み取り可能なストリームを使用する方法を見てきました。これは、ユースケースが実用的であるため、ストリームの操作を開始するための優れた方法です。

次に、コードを使用してリソースへのアクセスを許可できるように、読み取り可能なストリームを作成する方法を見てみましょう。

私たちはすでに使用しましたReadableStream前のオブジェクト。それでは、を使用して新しいものを作成しましょうnewキーワード:

const stream = new ReadableStream()

このストリームは現在、あまり役に立ちません。それは空のストリームであり、誰かがそれから読み取りたい場合、データはありません。

初期化中にオブジェクトを渡すことで、ストリームの動作を定義できます。このオブジェクトは、これらのプロパティを定義できます。

  • start読み取り可能なストリームが作成されたときに呼び出される関数。ここでは、データソースに接続し、管理タスクを実行します。
  • pull内部キューの最高水準点に達していないときに、データを取得するために繰り返し呼び出される関数
  • cancelストリームがキャンセルされたときに呼び出される関数。たとえば、cancel()受信側でメソッドが呼び出されます

これは、オブジェクト構造の骨の折れる例です。

const stream = new ReadableStream({
  start(controller) {

}, pull(controller) {

}, cancel(reason) {

} })

start()そしてpull()コントローラオブジェクトを取得します。これは、ReadableStreamDefaultControllerオブジェクト。ストリームの状態と内部キューを制御できます。

ストリームにデータを追加するには、controller.enqueue()データを保持する変数を渡す:

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello')
  }
})

ストリームを閉じる準備ができたら、controller.close()

cancel()取得しますreasonこれはに提供される文字列ですReadableStream.cancel()ストリームがキャンセルされたときのメソッド呼び出し。

キューイング戦略を決定するオプションの2番目のオブジェクトを渡すこともできます。 2つのプロパティが含まれています。

  • highWaterMark内部キューに格納できるチャンクの総数。話しているときにこれについて言及しましたpull()
  • size、バイトで表されるチャンクサイズを変更するために使用できるメソッド

    {
    highWaterMark,
    size()
    }
    

これらは主に、特にストリームの圧力を制御するために役立ちます。パイプチェーン、WebAPIではまだ実験的なものです。

いつhighWaterMarkストリームの値に到達すると、背圧信号はパイプ内の前のストリームに送信され、データ圧力を遅くするように指示します。

キューイング戦略を定義する2つの組み込みオブジェクトがあります。

  • ByteLengthQueuingStrategyこれは、チャンクのバイト単位の累積サイズが指定された最高水準点を超えるまで待機します
  • CountQueuingStrategyチャンクの累積数が指定された最高水準点を超えるまで待機します

32バイトの最高水準点を設定する例:

new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }

1チャンクの最高水準点を設定する例:

new CountQueuingStrategy({ highWaterMark: 1 })

私はあなたにあなたに言うためにこれに言及しますできるストリームに流入するデータの量を制御し、他のアクターと通信しますが、物事が非常に速く複雑になるため、詳細については説明しません。

ティーストリーム

以前、ストリームの読み取りを開始するとすぐにロックされ、他のリーダーは電話をかけるまでストリームにアクセスできないと述べましたreleaseLock()その上に。

ただし、を使用してストリームを複製することはできますtee()ストリーム自体のメソッド:

const stream = //...
const tees = stream.tee()

teesは、2つの新しいストリームを含む配列になりました。これを使用して、tees[0]そしてtees[1]

書き込み可能なストリーム

書き込み可能なストリームに関しては、3つのクラスのオブジェクトがあります。

  • WritableStream
  • WritableStreamDefaultReader
  • WritableStreamDefaultController

WritableStreamオブジェクトを使用して、後で消費できるストリームを作成できます。

これが、新しい書き込み可能なストリームを作成する方法です。

const stream = new WritableStream()

有用であるためには、オブジェクトを渡す必要があります。このオブジェクトには、次のオプションのメソッド実装があります。

  • start()オブジェクトが初期化されるときに呼び出されます
  • write()チャンクをシンクに書き込む準備ができたときに呼び出されます(書き込まれる前にストリームデータを保持する基礎となる構造)
  • close()チャンクを書き終えたときに呼び出されます
  • abort()エラーを通知したいときに呼び出されます

これがスケルトンです:

const stream = new WritableStream({
  start(controller) {

}, write(chunk, controller) {

}, close(controller) {

}, abort(reason) {

} })

start()close()そしてwrite()コントローラを通過し、WritableStreamDefaultControllerオブジェクトインスタンス。

はどうかと言うとReadableStream()、2番目のオブジェクトをに渡すことができますnew WritableStream()キューイング戦略を設定します。

たとえば、メモリに格納されている文字列を指定して、コンシューマーが接続できるストリームを作成するストリームを作成しましょう。

まず、受信したバイトを使用して文字に変換するために使用するデコーダーを定義します。エンコーディングAPI TextDecoder()コンストラクタ:

const decoder = new TextDecoder("utf-8")

WritableStreamを初期化して、close()メソッド。メッセージが完全に受信され、クライアントコードがそれを呼び出すと、コンソールに出力されます。

const writableStream = new WritableStream({
  write(chunk) {
    //...
  },
  close() {
    console.log(`The message is ${result}`)
  }
})

開始しますwrite()ArrayBufferを初期化し、それにチャンクを追加することによる実装。次に、Encoding APIのdecoder.decode()メソッドを使用して、バイトであるこのチャンクを文字にデコードします。次に、この値をに追加しますresultこのオブジェクトの外部で宣言する文字列:

let result

const writableStream = new WritableStream({ write(chunk) { const buffer = new ArrayBuffer(2) const view = new Uint16Array(buffer) view[0] = chunk const decoded = decoder.decode(view, { stream: true }) result += decoded }, close() { //… } })

これで、WritableStreamオブジェクトが初期化されます。

次に、このストリームを使用するクライアントコードを実装します。

私たちは最初にWritableStreamDefaultWriterからのオブジェクトwritableStreamオブジェクト:

const writer = writableStream.getWriter()

次に、送信するメッセージを定義します。

const message = 'Hello!'

次に、エンコーダを次のように初期化します。エンコードストリームに送信する文字:

const encoder = new TextEncoder()
const encoded = encoder.encode(message, { stream: true })

この時点で、文字列はバイト配列にエンコードされています。今、私たちは使用しますforEachこの配列をループして、各バイトをストリームに送信します。への各呼び出しの前にwrite()ストリームライターの方法、チェックしますreadypromiseを返すプロパティなので、ストリームライターの準備ができたときにのみ書き込みます。

encoded.forEach(chunk => {
  writer.ready.then(() => {
    return writer.write(chunk)
  })
})

私たちが今見逃しているのは、ライターを閉じることだけです。forEachは同期ループです。つまり、各項目が書き込まれた後にのみこのポイントに到達します。

私たちはまだチェックしますreadyプロパティの場合、close()メソッドを呼び出します。

writer.ready.then(() => {
  writer.close()
})

私の無料ダウンロードJavaScriptビギナーズハンドブック


その他のブラウザチュートリアル: