nodejs: leia do arquivo e armazene no db, limite o máximo de operações simultâneas do db

Eu tenho um arquivo CSV no qual estou lendo como fluxo e usando transformações para converter em JSON e depois armazenar de forma assíncrona cada linha em um banco de dados.

O problema é que a leitura do arquivo é rápida e, portanto, leva a um número muito grande de operações assíncronas de banco de dados simultâneas, o que faz com que o aplicativo pare.

Gostaria de limitar o aplicativo para que um máximo de N operações de banco de dados pendentes estejam em andamento a qualquer momento.

Este é o núcleo básico da minha função _transform:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

Analisei algumas opções, mas essas pareciam os melhores candidatos:

Use a API assíncrona 'forEachLimit'O problema que vejo aqui é que, na transformação do meu fluxo, estou operando apenas em um objeto (linha do arquivo) ao emitir operações.Não é possível ler todo o arquivo devido ao tamanhoUse uma solução assíncrona, paralela, limitada por simultaneidade, conforme descrito aqui, na seção 7.2.3:http://book.mixu.net/node/ch7.htmlO problema para mim aqui é o que fazer no caso de 'limite' ser atingido.Girar ou usar o setTimeout parece consumir todo o horário agendado e evita que meus retornos de chamada do banco de dados, que devem diminuir o início do contador 'em execução'.

Estas foram minhas tentativas iniciais da 'solução limitada por simultaneidade':

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

Eu só comecei o node.js esta semana, então não estou claro qual é o modelo correto para resolver isso.

Nota: Estou ciente de que, pelo menos, isso deve ser um recuo exponencial ao usar o último modelo, e deve haver algum sistema de 'max backoffs' em vigor, para não explodir a pilha de chamadas. Tentei mantê-lo simples aqui por enquanto.

questionAnswers(2)

yourAnswerToTheQuestion