nodejs: Aus Datei lesen und in Datenbank speichern, maximale Anzahl gleichzeitiger Datenbankoperationen begrenzen

Ich habe eine CSV-Datei, die ich als Stream einlese und mithilfe von Transformationen in JSON konvertiere und dann jede Zeile asynchron in einer Datenbank speichere.

Das Problem ist, dass das Lesen aus der Datei schnell ist und daher zu sehr vielen gleichzeitigen asynchronen DB-Vorgängen führt, wodurch die App zum Stillstand kommt.

Ich möchte die App so einschränken, dass zu einem bestimmten Zeitpunkt maximal N ausstehende DB-Vorgänge ausgeführt werden.

Dies ist der grundlegende Kern meiner _transform-Funktion:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

Ich habe einige Optionen geprüft, aber diese schienen die besten Kandidaten zu sein:

Verwenden Sie die asynchrone API 'forEachLimit'Das Problem, das ich hier sehe, ist, dass ich in meiner Stream-Transformation nur ein Objekt (Zeile aus Datei) bearbeite, wenn ich Vorgänge ausstelle.Das Einlesen der gesamten Datei ist aufgrund der Größe nicht möglichVerwenden Sie eine asynchrone, parallele, auf Parallelität beschränkte Lösung, wie in Abschnitt 7.2.3 beschrieben:http://book.mixu.net/node/ch7.htmlDas Problem für mich hier ist, was zu tun ist, wenn das Limit erreicht ist.Das Drehen oder Verwenden von setTimeout scheint die gesamte geplante Zeit zu verbrauchen und verhindert, dass meine DB-Rückrufe, die den "laufenden" Zähler dekrementieren sollten, ausgelöst werden.

Dies waren meine ersten Versuche mit der "Concurrency Limited Solution":

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

Ich habe erst diese Woche mit node.js begonnen, daher ist mir nicht klar, welches Modell für die Lösung das richtige ist.

Hinweis: Ein paar Dinge, die mir bewusst sind, sind, dass dies zumindest ein exponentieller Backoff sein sollte, wenn das letztere Modell verwendet wird, und dass ein System mit maximalen Backoffs vorhanden sein sollte, um den Call-Stack nicht zu sprengen. Versucht es hier fürs Erste einfach zu halten.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage