NodeJS, Versprechen, Streams - Verarbeitung großer CSV-Dateien

Ich muss eine Funktion zum Verarbeiten großer CSV-Dateien für die Verwendung in einem bluebird.map () -Aufruf erstellen. Angesichts der möglichen Dateigrößen würde ich gerne Streaming verwenden.

Diese Funktion sollte einen Stream (eine CSV-Datei) und eine Funktion (die die Blöcke aus dem Stream verarbeitet) akzeptieren und ein Versprechen zurückgeben, wenn die Datei bis zum Ende gelesen (behoben) oder Fehler (zurückgewiesen) wird.

Also, ich beginne mit:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Nun, ich habe zwei miteinander zusammenhängende Probleme:

Ich muss die tatsächlich verarbeitete Datenmenge drosseln, um keinen Speicherdruck zu erzeugen. Die als @ übergebene Funktiprocessor param wird häufig asynchron sein, z. B. das Speichern des Inhalts der Datei in der Datenbank über eine Bibliothek, die auf Versprechungen basiert (im Moment:pg-promise). Als solches wird es ein Versprechen im Gedächtnis schaffen und wiederholt weitermachen.

Daspg-promise library hat Funktionen, um dies zu verwalten, wie zBSeite(, aber ich bin nicht in der Lage, vorwegzunehmen, wie Stream-Event-Handler mit diesen Versprechungsmethoden gemischt werden. Im Moment gebe ich im Handler ein Versprechen für @ zurücreadable Abschnitt nach jedemread(), was bedeutet, dass ich eine große Menge an versprochenen Datenbankoperationen erstelle und schließlich ausfalle, weil ich ein Prozessspeicherlimit erreicht habe.

Hat jemand ein funktionierendes Beispiel dafür, das ich als Sprungpunkt verwenden kann?

AKTUALISIERE: Wahrscheinlich mehr als eine Möglichkeit, die Katze zu häuten, aber das funktioniert:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Sieht jemand ein potenzielles Problem mit diesem Ansatz?

Antworten auf die Frage(6)

Ihre Antwort auf die Frage