Node.js Streams

O que são streams

Streams são um dos conceitos fundamentais que empoderam aplicações Node.js.

Elas são um jeito de lidar com leitura/escrita de arquivos, comunicações em rede, ou qualquer tipo de troca de informação end-to-end de uma forma eficiente.

Streams não são um conceito único do Node.js. Elas foram introduzidas no sistema operacional Unix décadas atrás, permitindo que programas possam interagir uns com os outros passando streams, através do operador pipe (|).

Por exemplo, em uma abordagem tradicional, quando você diz ao programa para ler um arquivo, o arquivo é lido na memória, do começo ao fim, e então é processado.

Utilizando streams você lê pedaço por pedaço, processando o conteúdo sem mantê-lo completo na memória.

O módulo stream provê a fundação da qual todas APIs de streaming são feitas. Todas streams são instâncias do EventEmitter

Porquê streams

Streams basicamente nos dão duas grandes vantagens sobre outros métodos de manipulação de dados:

  • Eficiência de memória: você não precisa carregar grandes quantidades de dados em memória antes de ser capaz de processá-los
  • Eficiência temporal: menos tempo é requerido para começar a processar dados, uma vez que você pode começar a processar assim que os tiver, em vez de esperar até que toda carga útil de dados esteja disponível

Um exemplo de stream

Um exemplo típico é ler arquivos de um disco.

Usando o módulo fs do Node.js, você pode ler um arquivo, e serví-lo com HTTP quando uma nova conexão é estabelecida no seu servidor HTTP:

const http = require('http')
const fs = require('fs')
const server = http.createServer(function(req, res) {
fs.readFile(__dirname + '/data.txt', (err, data) => {
res.end(data)
})
})
server.listen(3000)

readFile() lê todo conteúdo do arquivo, e invoca uma função callback quando terminar.

res.end(data) na callbac vai retornar o conteúdo do arquivo para o client HTTP.

Se o arquivo for grande, a operação vai demorar um pouco. Aqui temos a mesma funcionalidade usando streams:

const http = require('http')
const fs = require('fs')
const server = http.createServer((req, res) => {
const stream = fs.createReadStream(__dirname + '/data.txt')
stream.pipe(res)
})
server.listen(3000)

Em vez de esperar até que o arquivo seja completamente lido, nós começamos a "streamá-lo" para o client HTTP assim que tivermos chunks de dados prontos para serem enviados.

pipe()

O exemplo acima usa stream.pipe(res): o método pipe() é chamado na stream de arquivo.

O que esse código faz? Ele pega a origem, e canaliza para um destino.

Você chama isso na stream origem, e nesse caso, a stream de arquivo é canalizada para a resposta HTTP.

O valor retornado pelo método pipe() é a stream de destino, que é algo muito conveniente pois nos permite encadear múltiplas chamadas pipe(), desse jeito:

src.pipe(dest1).pipe(dest2)

Isso faz o mesmo que

src.pipe(dest1)
dest1.pipe(dest2)

APIs Node.js baseadas em streams

Dadas as suas vantagens, muitos dos módulos principais do Node.js fornecem recursos nativos para manipulação de streams, sendo os mais notáveis:

  • process.stdin retorna uma stream conectada ao stdin
  • process.stdout retorna uma stream conectada ao stdout
  • process.stderr retorna uma stream conectada ao stderr
  • fs.createReadStream() cria uma stream de leitura para um arquivo
  • fs.createWriteStream() cria uma stream de escrita para um arquivo
  • net.connect() inicia uma stream baseada em conexão
  • http.request() retorna uma instância da classe http.ClientRequest, que é uma stream de escrita
  • zlib.createGzip() comprime dados usando gzip (um algoritmo de compressão) em uma stream
  • zlib.createGunzip() descomprime uma stream gzip.
  • zlib.createDeflate() comprime dados usando deflate (um algoritmo de compressão) em uma stream
  • zlib.createInflate() descomprime uma stream deflate.

Diferentes tipos de streams

Existem quatro classes de streams:

  • Readable: uma stream de onde você pode canalizar, mas não canalizar dados nela (você pode receber dados, mas não pode enviar dados). Quando você coloca dados em uma stream de leitura, os dados são bufferizados, até que um consumer comece a ler os dados.
  • Writable: uma stream em que você pode canalizar dados, mas não obter dados dela (você pode enviar dados, mas não pode receber).
  • Duplex: uma stream em que você pode tanto canalizar dados quando obtê-los, basicamente uma combinação de uma stream Readable e uma Writable
  • Transform: uma stream Transform é similar a uma Duplex, mas a saída é um transform das entradas

Como criar uma stream de leitura (readable stream)

Nós obtêmos a stream de leitura pelo módulo stream, a inicializamos e implementamos o método readable._read().

Primeiro crie um objeto stream:

const Stream = require('stream')
const readableStream = new Stream.Readable()

então implemente o _read:

readableStream._read = () => {}

Você também pode implementar o _read usando a opção read:

const readableStream = new Stream.Readable({
read() {}
})

Agora que a stream está inicializada, nós podemos enviar dados a ela:

readableStream.push('hi!')
readableStream.push('ho!')

Como criar uma stream de escrita (writable stream)

Para criar uma stream de escrita nós extendemos o objeto base Writable, e implementamos seu método \ _write().

Primeiro crie um objeto stream:

const Stream = require('stream')
const writableStream = new Stream.Writable()

e então implemente o _write:

writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}

Agora você pode canalizar uma stream de leitura nele:

process.stdin.pipe(writableStream)

Como obtêr dados de uma stream de leitura

Como nós lemos dados de uma stream de leitura? Usando uma stream de escrita:

const Stream = require('stream')
const readableStream = new Stream.Readable({
read() {}
})
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('hi!')
readableStream.push('ho!')

Você também pode consumir uma stream de leitura diretamente, usando o evento readble:

readableStream.on('readable', () => {
console.log(readableStream.read())
})

Como enviar dados para uma stream de escrita

Usando o método write() da stream:

writableStream.write('hey!\n')

Sinalizando a uma stream de escrita que você terminou de escrever

Use o método end():

const Stream = require('stream')
const readableStream = new Stream.Readable({
read() {}
})
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('hi!')
readableStream.push('ho!')
writableStream.end()