NodeJS, обещания, потоки - обработка больших файлов CSV
Мне нужно создать функцию для обработки больших файлов CSV для использования в вызове bluebird.map (). Учитывая потенциальные размеры файла, я хотел бы использовать потоковую передачу.
Эта функция должна принимать поток (файл CSV) и функцию (которая обрабатывает фрагменты из потока) и возвращать обещание, когда файл читается до конца (разрешается) или ошибок (отклоняется).
Итак, я начну с:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Теперь у меня есть две взаимосвязанные проблемы:
Мне нужно регулировать фактический объем обрабатываемых данных, чтобы не создавать нагрузку на память.Функция передана какprocessor
param часто будет асинхронным, например, сохраняя содержимое файла в БД через библиотеку, основанную на обещаниях (прямо сейчас:pg-promise
). Как таковой, он создаст обещание в памяти и будет многократно двигаться дальше.pg-promise
Библиотека имеет функции для управления этим, какстраница (), но я не могу обернуться, как смешивать потоковые обработчики событий с этими методами обещаний. Прямо сейчас я возвращаю обещание в обработчике дляreadable
раздел после каждогоread()
Это означает, что я создаю огромное количество обещанных операций с базой данных и, в конце концов, выхожу из строя, потому что достиг предела памяти процесса.
У кого-нибудь есть рабочий пример этого, который я могу использовать в качестве отправной точки?
ОБНОВИТЬВозможно, это не один из способов снятия кожи с кошки, но это работает:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
Кто-нибудь видит потенциальную проблему с этим подходом?