Node.js потоки

Узнайте, для чего нужны потоки, почему они так важны и как их использовать.

Что такое стримы

Потоки - одна из фундаментальных концепций, лежащих в основе приложений Node.js.

Они позволяют эффективно обрабатывать чтение / запись файлов, сетевое взаимодействие или любой вид сквозного обмена информацией.

Потоки - это не уникальная концепция Node.js. Они были введены в операционную систему Unix несколько десятилетий назад, и программы могут взаимодействовать друг с другом, передавая потоки через оператор канала (|).

Например, традиционным способом, когда вы говорите программе прочитать файл, файл считывается в память от начала до конца, а затем вы его обрабатываете.

Используя потоки, вы читаете его по частям, обрабатывая его содержимое, не сохраняя его в памяти.

Node.jsstreamмодульобеспечивает основу, на которой построены все API-интерфейсы потоковой передачи.

Почему стримы

Потоки в основном предоставляют два основных преимущества при использовании других методов обработки данных:

  • Эффективность памяти: вам не нужно загружать большие объемы данных в память, прежде чем вы сможете их обработать
  • Эффективность времени: требуется гораздо меньше времени, чтобы начать обработку данных, как только они у вас есть, вместо того, чтобы ждать, пока вся полезная нагрузка данных будет доступна для запуска

Пример потока

Типичный пример - чтение файлов с диска.

Использование узлаfsмодуль вы можете читать файл и обслуживать его через HTTP, когда новое соединение установлено с вашим http-сервером:

const http = require('http')
const fs = require('fs')

const server = http.createServer(function (req, res) { fs.readFile(__dirname + ‘/data.txt’, (err, data) => { res.end(data) }) }) server.listen(3000)

readFile()читает все содержимое файла и по завершении вызывает функцию обратного вызова.

res.end(data)в обратном вызове вернет содержимое файла HTTP-клиенту.

Если файл большой, операция займет довольно много времени. Вот то же самое, написанное с использованием потоков:

const http = require('http')
const fs = require('fs')

const server = http.createServer((req, res) => { const stream = fs.createReadStream(__dirname + ‘/data.txt’) stream.pipe(res) }) server.listen(3000)

Вместо того, чтобы ждать, пока файл будет полностью прочитан, мы начинаем его потоковую передачу HTTP-клиенту, как только у нас есть фрагмент данных, готовый к отправке.

трубка()

В приведенном выше примере используется строкаstream.pipe(res): thepipe()вызывается в файловом потоке.

Что делает этот код? Он берет источник и направляет его в пункт назначения.

Вы вызываете его в исходном потоке, поэтому в этом случае файловый поток передается по конвейеру в ответ HTTP.

Возвращаемое значениеpipe()- это целевой поток, что очень удобно, позволяя связать несколькоpipe()звонки, вот так:

src.pipe(dest1).pipe(dest2)

Эта конструкция аналогична выполнению

src.pipe(dest1)
dest1.pipe(dest2)

API-интерфейсы узлов на основе потоков

Благодаря своим преимуществам многие базовые модули Node.js предоставляют встроенные возможности обработки потоков, в частности:

  • process.stdinвозвращает поток, подключенный к stdin
  • process.stdoutвозвращает поток, подключенный к stdout
  • process.stderrвозвращает поток, подключенный к stderr
  • fs.createReadStream()создает читаемый поток в файл
  • fs.createWriteStream()создает доступный для записи поток в файл
  • net.connect()инициирует потоковое соединение
  • http.request()возвращает экземпляр класса http.ClientRequest, который является доступным для записи потоком
  • zlib.createGzip()сжимать данные с помощью gzip (алгоритм сжатия) в поток
  • zlib.createGunzip()распаковать поток gzip.
  • zlib.createDeflate()сжатие данных с помощью deflate (алгоритм сжатия) в поток
  • zlib.createInflate()распаковать спущенный поток

Различные типы потоков

Есть четыре класса потоков:

  • Readable: поток, из которого вы можете перенаправить, но не в него (вы можете получать данные, но не отправлять в него данные). Когда вы помещаете данные в читаемый поток, они буферизуются до тех пор, пока потребитель не начнет читать данные.
  • Writable: поток, в который вы можете подключиться, но не из него (вы можете отправлять данные, но не получать от них)
  • Duplex: поток, в который можно передавать и передавать по каналу, в основном комбинация потока с возможностью чтения и записи.
  • Transform: поток преобразования похож на дуплексный, но вывод является преобразованием его ввода

Как создать читаемый поток

Получаем поток Readable изstreamмодуль, и мы его инициализируем

const Stream = require('stream')
const readableStream = new Stream.Readable()

Теперь, когда поток инициализирован, мы можем отправлять в него данные:

readableStream.push('hi!')
readableStream.push('ho!')

Как создать доступный для записи поток

Чтобы создать доступный для записи поток, мы расширяем базуWritableобъект, и мы реализуем его метод _write ().

Сначала создайте объект потока:

const Stream = require('stream')
const writableStream = new Stream.Writable()

затем реализовать_write:

writableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

Теперь вы можете направить читаемый поток в:

process.stdin.pipe(writableStream)

Как получить данные из читаемого потока

Как мы читаем данные из читаемого потока? Использование записываемого потока:

const Stream = require('stream')

const readableStream = new Stream.Readable() const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() }

readableStream.pipe(writableStream)

readableStream.push(‘hi!’) readableStream.push(‘ho!’)

Вы также можете напрямую использовать читаемый поток, используяreadableмероприятие:

readableStream.on('readable', () => {
  console.log(readableStream.read())
})

Как отправить данные в поток с возможностью записи

Использование потокаwrite()метод:

writableStream.write('hey!\n')

Сигнализация записываемого потока, который вы закончили писать

Использоватьend()метод:

const Stream = require('stream')

const readableStream = new Stream.Readable() const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => { console.log(chunk.toString()) next() }

readableStream.pipe(writableStream)

readableStream.push(‘hi!’) readableStream.push(‘ho!’)

writableStream.end()

Скачать мою бесплатнуюСправочник по Node.js


Дополнительные руководства по узлам: