nodejs: lea del archivo y almacene en db, limite las operaciones db simultáneas máximas

Tengo un archivo CSV que estoy leyendo como una secuencia, y uso transformaciones para convertir a JSON y luego almacenar asíncronamente cada línea en una base de datos.

El problema es que la lectura del archivo es rápida y, por lo tanto, genera un gran número de operaciones simultáneas de DB asíncronas, lo que hace que la aplicación se detenga.

Me gustaría limitar la aplicación de manera tal que un máximo de N operaciones de DB pendientes estén en progreso en un momento dado.

Este es el núcleo básico de mi función _transform:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

He visto algunas opciones, pero estos parecían los mejores candidatos:

Utilice la API asincrónica 'forEachLimit'El problema que veo aquí es que en mi transformación de flujo, solo estoy operando en un objeto (línea del archivo) al emitir operaciones.Leer el archivo completo no es factible debido al tamañoUse una solución asíncrona, paralela y de concurrencia limitada como se describe aquí, en la sección 7.2.3:http://book.mixu.net/node/ch7.htmlEl problema para mí aquí es qué hacer en caso de que se alcance el 'límite'.Girar o usar setTimeout parece agotar todo el tiempo programado y evita que mis devoluciones de llamada de DB disminuyan el inicio del contador 'en ejecución'.

Estos fueron mis intentos iniciales en la 'solución limitada de concurrencia':

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

Solo comencé node.js esta semana, así que no estoy claro cuál es el modelo correcto para resolver esto.

Nota:&nbsp;Un par de cosas de las que estoy al tanto es que esto debería ser al menos un retroceso exponencial si se usa el último modelo, y debería haber algún sistema de 'retroceso máximo' en el lugar, para no volar la pila de llamadas. Sin embargo, intenté mantenerlo simple aquí por ahora.