NodeJS, обещания, потоки - обработка больших файлов CSV

Мне нужно создать функцию для обработки больших файлов CSV для использования в вызове bluebird.map (). Учитывая потенциальные размеры файла, я хотел бы использовать потоковую передачу.

Эта функция должна принимать поток (файл CSV) и функцию (которая обрабатывает фрагменты из потока) и возвращать обещание, когда файл читается до конца (разрешается) или ошибок (отклоняется).

Итак, я начну с:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Теперь у меня есть две взаимосвязанные проблемы:

Мне нужно регулировать фактический объем обрабатываемых данных, чтобы не создавать нагрузку на память.Функция передана какprocessor param часто будет асинхронным, например, сохраняя содержимое файла в БД через библиотеку, основанную на обещаниях (прямо сейчас:pg-promise). Как таковой, он создаст обещание в памяти и будет многократно двигаться дальше.

pg-promise Библиотека имеет функции для управления этим, какстраница (), но я не могу обернуться, как смешивать потоковые обработчики событий с этими методами обещаний. Прямо сейчас я возвращаю обещание в обработчике дляreadable раздел после каждогоread()Это означает, что я создаю огромное количество обещанных операций с базой данных и, в конце концов, выхожу из строя, потому что достиг предела памяти процесса.

У кого-нибудь есть рабочий пример этого, который я могу использовать в качестве отправной точки?

ОБНОВИТЬВозможно, это не один из способов снятия кожи с кошки, но это работает:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Кто-нибудь видит потенциальную проблему с этим подходом?

Ответы на вопрос(3)

Ваш ответ на вопрос