NodeJS, promesas, flujos: procesamiento de grandes archivos CSV

Necesito crear una función para procesar grandes archivos CSV para usar en una llamada bluebird.map (). Dado el tamaño potencial del archivo, me gustaría usar la transmisión.

Esta función debe aceptar una secuencia (un archivo CSV) y una función (que procesa los fragmentos de la secuencia) y devolver una promesa cuando el archivo se lee hasta el final (resuelto) o errores (rechazado).

Entonces, empiezo con:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Ahora, tengo dos problemas relacionados entre sí:

Necesito acelerar la cantidad real de datos que se procesan, para no crear presiones de memoria.La función pasó comoprocessor Param a menudo será asíncrono, como guardar el contenido del archivo en la base de datos a través de una biblioteca basada en promesas (en este momento:pg-promise) Como tal, creará una promesa en la memoria y seguirá adelante, repetidamente.

lospg-promise la biblioteca tiene funciones para administrar esto, comopágina(), pero no puedo comprender cómo mezclar controladores de eventos de transmisión con estos métodos prometedores. En este momento, devuelvo una promesa en el controlador parareadable sección después de cadaread(), lo que significa que creo una gran cantidad de operaciones de base de datos prometidas y eventualmente falla porque llegué a un límite de memoria de proceso.

¿Alguien tiene un ejemplo de esto que pueda usar como punto de salto?

ACTUALIZAR: Probablemente más de una forma de desollar al gato, pero esto funciona:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

¿Alguien ve un problema potencial con este enfoque?

Respuestas a la pregunta(3)

Su respuesta a la pregunta