Cómo llamar a una función asincrónica dentro de un flujo legible de node.js

Este es un breve ejemplo de la implementación de una secuencia legible personalizada. La clase se llama MyStream. La secuencia saca los nombres de archivo / carpeta de un directorio y empuja los valores al evento de datos.

Para comparar, implementé (en este ejemplo) dos formas / funciones diferentes. Uno es sincrónico y el otro es asíncrono. El segundo argumento del constructor le permite decidir, de qué manera se utiliza (verdadero para el asíncrono y falso para sincrónico.

El contador de lectura cuenta el número de veces que se llama al método _read. Solo para dar un comentario.

var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);

function MyStream(dirpath, async, opt) {
  Readable.call(this, opt);
  this.async = async;
  this.dirpath = dirpath;
  this.counter = 0;
  this.readcounter = 0;
}

MyStream.prototype._read = function() {
  this.readcounter++;
  if (this.async === true){
    console.log("Readcounter: " + this.readcounter);
    that = this;
    fs.readdir(this.dirpath,function(err, files){
      that.counter ++;
      console.log("Counter: " + that.counter);
      for (var i = 0; i < files.length; i++){
        that.push(files[i]);
      }
      that.push(null);
    });
  } else {
    console.log("Readcounter: " + this.readcounter);
    files = fs.readdirSync(this.dirpath)
    for (var i = 0; i < files.length; i++){
      this.push(files[i]);
    };
    this.push(null);
  }
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
  console.log(chunk.toString());
});

La forma sincrónica funciona como se esperaba, pero algo interesante está sucediendo cuando lo llamo de forma asincrónica. Cada vez que se empuja el nombre del archivo a través dethat.push(files[i]) Se vuelve a llamar al método _read. Lo que causa errores, cuando finaliza el primer ciclo asíncrono ythat.push(null) define el final de la secuencia.

El entorno que estoy usando para probar esto: nodo 4.1.1, Electrón 0.35.2.

No entiendo por qué _read se llama tan a menudo y por qué sucede esto. Tal vez es un error? ¿O hay algo que no veo en este momento? ¿Hay alguna manera de construir una secuencia legible mediante el uso de funciones asincrónicas? Empujar los trozos asincrónicamente sería realmente genial, porque sería la forma de transmisión sin bloqueo. Especialmente cuando tienes una mayor cantidad de datos.

Respuestas a la pregunta(1)

Su respuesta a la pregunta