Wstawianie dużych CSV do MongoDB za pomocą Node.js i async.queue
Próbuję przesłać i wstawić duże pliki csv (100K wierszy; 10-100M +) do mongo.
Poniższy kod jest trasą, której używam do przyjmowania danych wejściowych z formularza i wstawiania rekordu najpierw do kolekcji metadanych dla wszystkich moich plików CSV, a następnie wstawiania rekordów CSV do własnej kolekcji. Działa dla mniejszych plików (tysiące wierszy), ale trwa zbyt długo, gdy osiągnie wartość 50K +.
Następny fragment wykorzystuje strumień csv dla większych plików (patrz poniżej), ale otrzymuję błędy podczas próby użycia strumienia.
Pytanie: Czy ktoś może pomóc zmodyfikować pierwszy przykład w strumień, aby przetwarzał duże pliki CSV bez zawieszania.
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
csvParser.mapFile('uploads/myFile', function(err, allRows){
if (err) throw err;
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
for (r in allRows) {
allRows[r].metric = parseFloat(allRows[r].metric);
}
var finalcollection = db.collection(collectionId);
finalcollection.insert(allRows, function(err, insertedAllRows) {
if (err) {
res.send(404, "Error");
}
else {
res.send(200);
}
});
});
});
});
}
EDYTUJ (Aby ludzie usunęli status wstrzymania):
Próbowałem tego podejścia przy użyciu strumienia:
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
var finalcollection = db.collection(collectionId);
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data, cb);
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
});
});
}
Ale dostaję ten błąd:
events.js:72
throw er; // Unhandled 'error' event
^
TypeError: object is not a function
Po trzecie, wypróbowałem to podejście strumieniowe:
var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data)
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
To wstawia kilka, a następnie przerywa:
all items have been processed
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous>
Ten faktycznie próbuje wstawić wiele kolekcji tego samego CSV do db. Wreszcie spróbowałem jednej definicji liniowej q:
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
Wraz z:
.transform(function(data, index, cb){
q.push(data,function (err) {
console.log('finished processing foo');
});
})
I wstawia kolekcję kilka razy i przerywa za każdym razem (poniżej jest wyjście, które dzieje się za każdym razem - dlaczego nie wychodzi poprawnie i ponownie wstawia?):
finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
at IncomingMessage.EventEmitter.emit (events.js:92:17)
at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)