Распараллеливание задачи с привязкой к процессору продолжается с привязкой к IO

Я пытаюсь найти хороший способ распараллеливания кода, который выполняет обработку больших наборов данных, а затем импортирует полученные данные в RavenDb.

Обработка данных связана с процессором, а импорт базы данных связан с вводом-выводом.

Я ищу решение для параллельной обработки числа потоков Environment.ProcessorCount. Полученные данные затем должны быть импортированы в RavenDb в x (скажем, 10) объединенных потоках параллельно с вышеуказанным процессом.

Главное здесь - я хочу, чтобы обработка продолжалась во время импорта завершенных данных, чтобы обработка следующего набора данных продолжалась в ожидании завершения импорта.

Другая проблема заключается в том, что после успешного импорта память для каждой партии должна быть удалена, так как личная рабочая память может легко достичь & gt; 5 ГБ.

Код ниже - это то, что я получил до сих пор. Обратите внимание, что он не соответствует требованиям параллелизации, изложенным выше.

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem возвращает перечисляемые элементы данных, которые разбиты на наборы пакетных данных. GetDataItem выдаст ~ 2 000 000 элементов, в среднем около 0,3 мс для обработки.

Проект работает на последней версии .NET 4.5 RC на платформе x64.

Update.

Мой текущий код (как показано выше) будет извлекать элементы и разбивать их на пакеты. Каждый пакет обрабатывается параллельно на восьми потоках (Environment.ProcessorCount на i7). Обработка медленная, с привязкой к процессору и интенсивным использованием памяти. Когда обработка отдельного пакета завершена, запускается задача асинхронного импорта полученных данных в RavenDb. Задание пакетного импорта само по себе является синхронным и выглядит так:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

Есть несколько проблем с этим подходом:

Every time a batch is completed a task is started to run the import job. I want to limit the number of tasks that run in parallel (eg. max 10). Additionally even though many tasks are started they seem to never run in parallel.

Memory allocations are a huge problem. Once a batch is processed/imported it seems to still remain in memory.

Я ищу способы позаботиться о проблемах, изложенных выше. В идеале я хочу:

One thread per logical processor doing heavy lifting processing batches of data. Ten or so parallel threads waiting for completed batches to import into RavenDb. To keep memory allocations to a minimum which means unallocating a batch after the import task is complete. To not run import jobs on one of the threads for batch processing. Import of completed batches should run in parallel to the next batch being processed.

Solution

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();

Ответы на вопрос(3)

Ваш ответ на вопрос