Wstawianie dużych CSV do MongoDB za pomocą Node.js i async.queue

Próbuję przesłać i wstawić duże pliki csv (100K wierszy; 10-100M +) do mongo.

Poniższy kod jest trasą, której używam do przyjmowania danych wejściowych z formularza i wstawiania rekordu najpierw do kolekcji metadanych dla wszystkich moich plików CSV, a następnie wstawiania rekordów CSV do własnej kolekcji. Działa dla mniejszych plików (tysiące wierszy), ale trwa zbyt długo, gdy osiągnie wartość 50K +.

Następny fragment wykorzystuje strumień csv dla większych plików (patrz poniżej), ale otrzymuję błędy podczas próby użycia strumienia.

Pytanie: Czy ktoś może pomóc zmodyfikować pierwszy przykład w strumień, aby przetwarzał duże pliki CSV bez zawieszania.

exports.addCSV = function(req,res){

var body = req.body;

fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
    if(err){
        fileSystem.unlink(req.files.myCSV.path, function(){});
        throw error;
    }
});

var myObject = {  userid: body.userid,
                  name: body.name, 
                  description: body.description 
               };

var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){

  if(err) throw err;

  var collection = db.collection('myCSVs');

  collection.insert(myObject, function(err, insertedMyObject){

        csvParser.mapFile('uploads/myFile', function(err, allRows){
                if (err) throw err;

                var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;

                for (r in allRows) {
                    allRows[r].metric = parseFloat(allRows[r].metric);
                }

                var finalcollection = db.collection(collectionId);
                finalcollection.insert(allRows, function(err, insertedAllRows) {
                        if (err) {
                            res.send(404, "Error");
                        }
                        else { 
                            res.send(200);
                        }
                });
        });
    });
});

}

EDYTUJ (Aby ludzie usunęli status wstrzymania):

Próbowałem tego podejścia przy użyciu strumienia:

exports.addCSV = function(req,res){

  var body = req.body;

  fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
    if(err){
      fileSystem.unlink(req.files.myCSV.path, function(){});
      throw error;
    }
  });

  var myObject = {  userid: body.userid,
                name: body.name, 
                description: body.description 
             };

  var MongoClient = require('mongodb').MongoClient;
  MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){

    if(err) throw err;

    var collection = db.collection('myCSVs');

    collection.insert(myObject, function(err, insertedMyObject){

      var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
      var finalcollection = db.collection(collectionId);
      var q = async.queue(finalcollection.insert.bind(finalcollection), 5);

      q.drain = function() {
          console.log('all items have been processed');
      }

      csv()
      .from.path('uploads/myFile', {columns: true})
      .transform(function(data, index, cb){

              q.push(data, cb); 

      })
      .on('end', function () {
          res.send(200);
          console.log('on.end() executed');
      })
      .on('error', function (err) {
          res.end(500, err.message);
          console.log('on.error() executed');
      });

  });

 });

}

Ale dostaję ten błąd:

events.js:72
    throw er; // Unhandled 'error' event
          ^
TypeError: object is not a function

Po trzecie, wypróbowałem to podejście strumieniowe:

var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);

q.drain = function() {
    console.log('all items have been processed');
}

csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
    q.push(data) 
})
.on('end', function () {
    res.send(200);
    console.log('on.end() executed');
})
.on('error', function (err) {
    res.end(500, err.message);
    console.log('on.error() executed');
});

To wstawia kilka, a następnie przerywa:

all items have been processed
all items have been processed
Error: Request aborted
    at IncomingMessage.<anonymous>      

Ten faktycznie próbuje wstawić wiele kolekcji tego samego CSV do db. Wreszcie spróbowałem jednej definicji liniowej q:

var q = async.queue(finalcollection.insert.bind(finalcollection), 5);

Wraz z:

.transform(function(data, index, cb){

                q.push(data,function (err) {
                    console.log('finished processing foo');
                });

})

I wstawia kolekcję kilka razy i przerywa za każdym razem (poniżej jest wyjście, które dzieje się za każdym razem - dlaczego nie wychodzi poprawnie i ponownie wstawia?):

finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed

Error: Request aborted
    at IncomingMessage.<anonymous>    (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
    at IncomingMessage.EventEmitter.emit (events.js:92:17)
    at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)

questionAnswers(1)

yourAnswerToTheQuestion