Parallelisierung der CPU-gebundenen Task wird mit der E / A-Bindung fortgesetzt

Ich versuche, eine gute Methode zur Parallelisierung von Code zu finden, mit der große Datasets verarbeitet und die resultierenden Daten dann in RavenDb importiert werden.

Die Datenverarbeitung ist CPU-gebunden und der Datenbankimport IO-gebunden.

Ich suche nach einer Lösung für die parallele Verarbeitung auf Environment.ProcessorCount Anzahl der Threads. Die resultierenden Daten sollten dann parallel zum obigen Prozess in RavenDb auf x (sagen wir 10) gepoolten Threads importiert werden.

Die Hauptsache hier ist, dass die Verarbeitung fortgesetzt wird, während die fertigen Daten importiert werden, damit die Verarbeitung des nächsten Datensatzes fortgesetzt wird, während auf den Abschluss des Imports gewartet wird.

Ein weiteres Problem ist, dass der Speicher für jeden Stapel nach einem erfolgreichen Import gelöscht werden muss, da der private Arbeitsspeicher problemlos> 5 GB erreichen kann.

Der folgende Code ist das, was ich bisher habe. Beachten Sie, dass die oben genannten Parallelisierungsanforderungen nicht erfüllt werden.

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem liefert unzählige Datenelemente, die in ein Batch-Dataset partitioniert sind. GetDataItem liefert ca. 2.000.000 Elemente mit einer durchschnittlichen Verarbeitungszeit von ca. 0,3 ms.

Das Projekt wird auf dem neuesten .NET 4.5 RC auf einer x64-Plattform ausgeführt.

Aktualisieren.

Mein aktueller Code (siehe oben) ruft Elemente ab und partitioniert sie in Stapel. Jeder Stapel wird parallel auf acht Threads verarbeitet (Environment.ProcessorCount auf i7). Die Verarbeitung ist langsam, CPU-gebunden und speicherintensiv. Wenn die Verarbeitung eines einzelnen Stapels abgeschlossen ist, wird eine Task gestartet, um die resultierenden Daten asynchron in RavenDb zu importieren. Der Batch-Import-Job selbst ist synchron und sieht folgendermaßen aus:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

Bei diesem Ansatz treten einige Probleme auf:

Jedes Mal, wenn ein Stapel abgeschlossen ist, wird eine Task gestartet, um den Importjob auszuführen. Ich möchte die Anzahl der parallel laufenden Aufgaben begrenzen (zB max. 10). Auch wenn viele Aufgaben gestartet wurden, scheinen sie nie parallel zu laufen.

Speicherzuordnungen sind ein großes Problem. Sobald ein Stapel verarbeitet / importiert wurde, scheint er noch im Speicher zu bleiben.

Ich suche nach Möglichkeiten, um die oben beschriebenen Probleme zu lösen. Idealerweise möchte ich:

Ein Thread pro logischem Prozessor, der schwere Datenmengen verarbeitet.Etwa zehn parallele Threads warten darauf, dass abgeschlossene Stapel in RavenDb importiert werden.Minimierung der Speicherzuweisungen, dh Aufheben der Zuordnung eines Stapels nach Abschluss der Importaufgabe.So führen Sie keine Importaufträge für einen der Threads für die Stapelverarbeitung aus. Der Import abgeschlossener Chargen sollte parallel zur nächsten verarbeiteten Charge erfolgen.

Lösung

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();

Antworten auf die Frage(3)

Ihre Antwort auf die Frage