Równoległość zadania związanego z CPU kontynuowana z IO związanym

Próbuję znaleźć dobry sposób na równoległość kodu, który przetwarza duże zbiory danych, a następnie importuje uzyskane dane do RavenDb.

Przetwarzanie danych jest związane z CPU i importem bazy danych IO.

Szukam rozwiązania do przetwarzania równoległego na liczbę wątków Environment.ProcessorCount. Uzyskane dane należy następnie zaimportować do RavenDb na x (powiedzmy 10) połączonych wątków równolegle z powyższym procesem.

Najważniejsze jest to, aby przetwarzanie było kontynuowane podczas importowania zakończonych danych, tak aby przetwarzanie następnego zestawu danych trwało w oczekiwaniu na zakończenie importu.

Inną kwestią jest to, że pamięć dla każdej partii musi zostać odrzucona po udanym imporcie, ponieważ prywatna pamięć robocza może łatwo osiągnąć> 5 GB.

Kod poniżej jest tym, co mam do tej pory. Należy pamiętać, że nie spełnia on wymagań dotyczących równoległości opisanych powyżej.

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem daje wyliczalne elementy danych, które są podzielone na zestaw danych wsadowych. GetDataItem dostarczy ~ 2 000 000 elementów o średniej wartości około 0,3 ms do przetwarzania.

Projekt działa na najnowszym .NET 4.5 RC na platformie x64.

Aktualizacja.

Mój obecny kod (patrz wyżej) spowoduje pobranie elementów i podzielenie ich na partie. Każda partia jest przetwarzana równolegle w ośmiu wątkach (Environment.ProcessorCount na i7). Przetwarzanie jest powolne, związane z procesorem i intensywne pamięci. Po zakończeniu przetwarzania pojedynczej partii rozpoczyna się zadanie asynchronicznego importowania danych wynikowych do RavenDb. Zadanie importu wsadowego samo w sobie jest synchroniczne i wygląda tak:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

Jest kilka problemów z tym podejściem:

Za każdym razem, gdy partia jest zakończona, zadanie jest uruchamiane, aby uruchomić zadanie importu. Chcę ograniczyć liczbę zadań wykonywanych równolegle (np. Maks. 10). Dodatkowo, mimo że wiele zadań jest uruchamianych, wydaje się, że nigdy nie działają równolegle.

Przydział pamięci to ogromny problem. Po przetworzeniu / zaimportowaniu partii wydaje się, że nadal pozostaje w pamięci.

Szukam sposobów, aby zająć się problemami opisanymi powyżej. Idealnie chcę:

Jeden wątek na procesor logiczny, który wykonuje ciężkie partie przetwarzania danych.Dziesięć lub więcej równoległych wątków oczekujących na ukończone partie do zaimportowania do RavenDb.Aby zachować przydziały pamięci do minimum, co oznacza nieprzydzielenie partii po zakończeniu zadania importowania.Aby nie uruchamiać zadań importowania na jednym z wątków do przetwarzania wsadowego. Import zakończonych partii powinien przebiegać równolegle z kolejną przetwarzaną partią.

Rozwiązanie

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();

questionAnswers(3)

yourAnswerToTheQuestion