NodeJS, promessas, fluxos - processando arquivos CSV grandes

Preciso criar uma função para processar arquivos CSV grandes para uso em uma chamada bluebird.map (). Dado o tamanho potencial do arquivo, eu gostaria de usar o streaming.

Essa função deve aceitar um fluxo (um arquivo CSV) e uma função (que processa os pedaços do fluxo) e retornar uma promessa quando o arquivo for lido até o final (resolvido) ou erros (rejeitado).

Então, eu começo com:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Agora, tenho dois problemas inter-relacionados:

Preciso limitar a quantidade real de dados sendo processados, para não criar pressões de memória.A função passou como oprocessor O parâmetro frequentemente será assíncrono, como salvar o conteúdo do arquivo no banco de dados por meio de uma biblioteca baseada em promessas (agora:pg-promise) Como tal, criará uma promessa na memória e seguirá repetidamente.

opg-promise biblioteca possui funções para gerenciar isso, comopágina(), mas não consigo entender como misturar manipuladores de eventos de fluxo com esses métodos de promessa. No momento, retorno uma promessa no manipulador parareadable seção após cadaread(), o que significa que eu crio uma quantidade enorme de operações prometidas do banco de dados e eventualmente falho porque atingi um limite de memória do processo.

Alguém tem um exemplo prático disso que eu possa usar como ponto de partida?

ATUALIZAR: Provavelmente mais de uma maneira de esfolar o gato, mas isso funciona:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Alguém vê um problema em potencial com essa abordagem?

questionAnswers(3)

yourAnswerToTheQuestion