Blog

Data Streaming en NodeJS

Jesús Dapena Gómez

Streams

La palabra inglesa stream tiene distintas traducciones en español, riachuelo, arroyo, torrente, corriente, flujo…  todas relacionadas con la idea de un movimiento fluido continuo de algo hacia algún sitio, por lo que, dentro del contexto de la programación y el desarrollo de software, podríamos explicar el concepto de data streaming como el proceso de transmisión de datos entre dos puntos, como si fueran transportados por un río.

Los Streams se pueden utilizar, por ejemplo, cuando necesitamos transmitir muchos datos que pueden cortarse en trozos pequeños para su envío, o cuando se va a realizar la transmisión de muchos datos entre dos partes. Estos datos “troceados” no aportan información de forma independiente, pero cobran sentido cuando volvemos a unirlos en su destino.

NodeJS dispone de un módulo de Stream con un API para implementar su uso que es de las más útiles de Node y al mismo tiempo de las menos utilizadas, ya que presenta varias dificultades que pueden disuadir de su uso.

En este artículo os explicamos los tipos de streams y los eventos, conceptos que pueden ayudaros cuando necesitéis utilizarla.

Tipos de Streams

A la hora de programar es necesario especificar el tipo de Stream:

  • Readable: streams que permiten extraer los datos desde la fuente. Hay varios modos de funcionamiento diferentes.
  • Writable: streams que permiten escribir en una base de datos u otro programa.
  • Duplex: streams que funciona a la vez como Readable y Writable.
  • Transform: streams que se encuentra en medio de la cadena y que permite modificar los datos que por pasan él.

Los tipos readable y writable son los más usados, por lo que, a continuación, vamos a explicarlos más en detalle.

Readable

Permite extraer o consumir datos desde un origen, ya sea un programa diferente, una base de datos, u otra parte del mismo programa. 

Lo más destacable del readable Stream son sus modos de funcionamiento:

  • Flowing mode: el stream lee directamente de la fuente de datos y de manera automática los transmite al destino.
  • Paused mode: el stream debe ser llamado para leer de la fuente de datos y luego transmitirlos.

Es importante destacar algunas diferencias importantes entre estos dos modos:

  • En el flowing mode se debe implementar el método _read de la clase. Este método se llama de manera automática cuando el destino pide más datos, es decir, cuando puede consumir más.
  • En paused mode no se implementa el método _read si no que cuando el stream dispone de datos los transmite al destino
  • En el flowing mode el mecanismo que regula cuando leer datos y transmitirlos al destino es automático.
  • En el modo paused el mecanismo que regula cuando leer datos y transmitirlos al destino debe ser implementado por el usuario a partir de eventos.


Flowing mode

Una aplicación del flowing mode sería leer de una base de datos que bloquee la lectura hasta que haya nuevos datos, por ejemplo, una lectura bloqueante de Redis.

De esta manera cuando el destino (por ejemplo, un writable) puede leer más datos de manera automática se llamará al evento _read del readable. En este método se lee de la base de datos, y cuando haya un dato disponible se transmitirá al writable. 

La gestión de cuándo leer o parar de leer (llamar al método _read) se realiza de manera automática si el destino puede consumir más datos o no, esto es lo que se conoce como backPressure, es decir, nunca se van a transmitir más datos de los que se pueden procesar.

Paused mode

Una aplicación del paused mode sería cuando queremos leer de una fuente de datos que no presenta una lectura bloqueante, o bien cuando los datos pueden llegar de manera asíncrona, por ejemplo, si la fuente de datos es una aplicación que emite eventos “on(data)”.

No se implementaría el método _read ya que no disponemos de ningun endpoint para preguntar a la fuente de datos, se implementaría el on(‘data’) de la fuente de datos.

Cada vez que tengamos datos disponibles en el listener lo transmitiremos al destino.

Esta vez la gestión de los eventos no se realiza de manera automática, por lo que tendremos que implementar los siguientes eventos:

  • pause’: el destino, el writable, no puede consumir más datos, ya que se ha llegado al máximo del buffer que puede procesar, por lo que deberemos parar de escuchar los datos que llegan o parar de preguntar.
  • drain’: este evento indica que el writable se ha liberado, y que por lo tanto es capaz de seguir consumiendo, por lo que deberemos reanudar la lectura de los datos.


Writable

A través de un buffer recibe datos que pueden ser escritos en el destino que nosotros deseemos. Una vez escrito el dato se notifica para que pueda seguir consumiendo más datos.

Al igual que en el caso anterior, según el método implementado hay varios modos de funcionamiento:

  • _write (modo básico de funcionamiento): cada vez que el readable introduzca un dato en el stream éste llegará al writable. 

Una vez tenemos el dato lo podemos escribir en la base de datos.

  • _writev: si solamente se implementa el método writev el funcionamiento es ligeramente diferente. En este caso en lugar de recibir dato por dato, se recibirá un array con todos los datos que ha escrito el readable en el buffer.

Una vez tenemos un array con todos estos datos se pueden guardar de golpe en la base de datos. Esto resulta útil cuando las latencias en la escritura de datos son altas y la base de datos permite hacer bulks múltiples.

  • _write y writev: si se implementan ambos métodos se deja al sistema de node que escoja el funcionamiento del writable. 

En un inicio cada vez que llegue un dato al buffer lo transmitirá al _write, si se van a acumulando los datos en el buffer porque el readable lee más rápido que el writable, cuando se llene el buffer se enviará un array de datos al writev para escribir de golpe en la base de datos.

A parte de la implementación de estos dos métodos es importante señalar que en el caso de que se devuelve error en el write o en el writev se desacoplarán de manera automática el readable y writable, por lo que se parará de leer.


Eventos

Si utilizamos el modo flowing del readable el número de eventos que deberemos de escuchar para un correcto funcionamiento será menor. Aun así, de manera general, los eventos a escuchar para gestionarlo correctamente serán:

Writable

  • Close: cuando el stream se ha cerrado, quiere decir que no va a procesar más datos ni se van a emitir más eventos.
  • Drain: indica cuando el stream puede volver a leer datos. Por ejemplo, si el readable stream está en modo paused se utiliza este evento para poder volver a leer datos.
  • Error: se ha lanzado un error. Es llamado cuando en el método _write o _writev se devuelve un error en la callback.
  • Finish: cuando se ha procesado toda la información y no hay más datos en el buffer.
  • Pipe: es generado cuando se han conectado con otro stream de manera satisfactoria.
  • Unpipe: llamado cuando se ha desconectado de otro stream. Cuando se devuelve un error en _write o _writev se genera este evento de desconexión de ambos stream.

Readable

  • Close: el stream se ha cerrado y por lo tanto no se leerán más datos.
  • Data: generado cuando el usuario llama al evento push del readable. Si se suscribe aparecerán los datos enviados en el push. 
  • End: cuando se han transmitidos todos los datos.
  • Error: ha ocurrido un error dentro del _read mientras se trataban los datos.
  • Pause: en modo paused mode el readable está pausado, no seguirá leyendo hasta que se le indique.
  • Readable: no debe ser implementado ni escuchado, ya que es un evento interno.
  • Resume: llamado cuando se vuelven a procesar datos.
Texto
Jesús Dapena Gómez
Edición de texto
Marisa Perdices Castillo
Edición de imagen
Lorena Heredia, © Gerd Altmann (Pixabay)
Software
Backend
Node
NodeJS
Data Streaming
¿Te ha gustado?
Compártelo en redes sociales
Suscríbete a
nuestra newsletter

Mantente al día de todas las novedades Mytra: proyectos, eventos, noticias…