Melhor maneira de limitar o número de Tarefas ativas em execução através da Biblioteca de Tarefas Paralelas

Considere uma fila segurando umamuito de trabalhos que precisam de processamento. A limitação da fila só pode obter 1 job de cada vez e não há como saber quantos jobs existem. Os trabalhos levam 10s para serem concluídos e envolvem muita espera por respostas de serviços da Web, portanto, não estão vinculados à CPU.

Se eu usar algo assim

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Em seguida, ele furiosamente vai colocar os trabalhos da fila muito mais rápido do que pode completá-los, ficar sem memória e cair na sua bunda. >.

Não posso usar (não acho)ParallelOptions.MaxDegreeOfParallelism porque eu não posso usar Parallel.Invoke ou Parallel.ForEach

3 alternativas que encontrei

Substitua Task.Factory.StartNew por

Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
task.Start();

O que parece resolver um pouco o problema, mas eu não souclaro exatamente o que isso está fazendo e se esse é o melhor método.

Criar umaagendador de tarefas personalizado que limita o grau de simultaneidade

Use algo comoBlockingCollection para adicionar trabalhos à coleção quando iniciado e remover quando terminar para limitar o número que pode estar em execução.

Com o número 1, tenho que confiar que a decisão certa é tomada automaticamente, # 2 / # 3 Eu tenho que descobrir o número máximo de tarefas que podem ser executadas por mim mesmo.

Eu entendi isso corretamente - qual é o melhor caminho, ou há outro jeito?

EDITAR - Isso é o que eu tenho com as respostas abaixo, padrão produtor-consumidor.

Assim como o objetivo geral da taxa de transferência não foi para desempacotar os trabalhos mais rápido do que poderia ser processado e não ter fila de polling de múltiplos threads (não mostrada aqui, mas é uma operação sem bloqueio e levará a enormes custos de transação se pesquisada em alta freqüência de vários locais) .

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}

questionAnswers(6)

yourAnswerToTheQuestion