C # производитель / потребитель

Недавно мы сталкивались с реализацией шаблона производитель / потребитель c #. Это'Это очень просто и (по крайней мере для меня) очень элегантно.

похоже, он был разработан в 2006 году, поэтому мне было интересно, если эта реализация

- безопасный

- все еще применимо

Код ниже (оригинальный код был указан наhttp://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}
 sɐunıɔןɐqɐp08 нояб. 2017 г., 14:19
Привет, lboregard, есть рабочий пример использования BlockingCollection <T> реализовать простой шаблон Producer-Consumer вэта ссылка.
 James Black01 нояб. 2009 г., 05:42
На какую версию .NET вы нацелены?

Ответы на вопрос(4)

как работает Monitor.Wait / Pulse (и многое о потоках в целом) из приведенного выше фрагмента кода исерия статей это из. Так что, как говорит Джон, он имеет большую ценность и действительно безопасен и применим.

Тем не менее, начиная с .NET 4, существуетреализация очереди производитель-потребитель в рамках, Я только нашел это сам, но до этого момента он делает все, что мне нужно.

.концепция очереди производителя / потребителяпуть хотя старше этого :)

Да, этот код безопасен, насколько яЯ знаю - но у него есть некоторые недостатки:

Это'не является общим. Современная версия, безусловно, будет общей.У него нет возможности остановить очередь. Один простой способ остановить очередь (чтобы все потоки потребителя удалились) - это иметь "прекрати работу токен, который можно поставить в очередь. Затем вы добавляете столько токенов, сколько у вас есть потоков. Кроме того, у вас есть отдельный флаг, чтобы указать, что вы хотите остановить. (Это позволяет другим потокам остановиться до завершения всей текущей работы в очереди.)Если рабочих мест очень мало, потребление одной работы за раз может быть не самым эффективным.

Если честно, идеи, лежащие в основе кода, важнее самого кода.

 Jon Skeet28 июн. 2015 г., 08:38
@TakeMeAsAGuest: Не уверен, что ты имеешь в виду в первой части - ты говоришь?Вы видели случаи, когда на мониторе ожидали потоки, но пульс ничего не делал? Что касается производительности - есть много, много разных сценариев (оборудование, программное обеспечение, количество ожидающих потоков и т. Д.). Я'посмотрим, смогу ли я выкопать некоторые ссылки в Джо ДаффиКнига ...
 TakeMeAsAGuest28 июн. 2015 г., 04:05
пульсирование не обязательно разбудит потребителя, я знаю, что это маловероятно, но теоретически только производители могут работать бесконечно. Кроме того, измеренное ожидание / импульс монитора не имеет никакого преимущества в производительности по сравнению с ручками ожидания события.

s универсальный и имеет метод для постановки нуля в очередь (или любого другого флага, который выя хотел бы использовать), чтобы сообщить рабочим потокам о выходе.

Код взят здесь:http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<t> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<t> taskQ = new Queue<t>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}
</t></t></t>
 Sumit Ghosh06 мар. 2012 г., 22:32
На данный момент это лучшая реализация шаблона производителя. Я недавно использовал это в своем многопоточном приложении, и оно работает гладко даже при 1000-1500 нитях.
 FaNIX27 июл. 2015 г., 07:16
Можете ли вы обновить свой ответ с правильным кодом, упомянутым Маркусом?
 dashton09 июн. 2014 г., 17:49
Да, именно так, как говорит @Inactivist, просто передайте Действие <T> в конструкторе и передайте ему удаленный объект (вне замка). Извините, я пропустил это. Кстати, все это теперь немного избыточно, посколькунамного проще с блокировками коллекций / параллельными очередями и параллельные библиотеки - или даже RX!
 Inactivist03 янв. 2014 г., 18:58
я уверен, что яя что-то здесь упускаюочень ржавый на мои навыки C #), но этот пример невызвать метод на потребляемомtask (в отличие от упомянутыхнеуниверсальный код который хранит и вызываетAction делегат в потребителя.) Так чтосмысл сделать это универсальным, если это не такдемонстрация вызова метода для потребляемой задачи? (Попытка обернуть голову вокруг этого примера и ссылочной реализации.)
 Marcus23 мая 2014 г., 12:37
@Inactivist Вы можете добавить действие <T> в ctor сохраните как поле _action и вызовите его сразу после выполнения Dequeue (), например: _action (task);

Предупреждение: Если вы читаете комментарии, выпоймешь мой ответ неверный :)

Там'это возможнотупик в вашем коде.

Представьте себе следующий случай, для ясности я использовал однопотоковый подход, но его должно быть легко преобразовать в многопоточность с помощью сна:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

Пожалуйста, дайте мне знать, если это нене имеет никакого смысла.

Если это подтвердится, то ответ на ваш вопрос: «нет, это нет безопасно " ;) Надеюсь, это поможет.

 DiogoNeves20 мар. 2013 г., 17:58
Я должен признать, что я в основном забыл детали того, что я думал. Читая назад, я считаю, что проблема заключается в цикле пока. Это'будет держать поток потребителей заблокированным, покаЧто-то в очереди, и это не позволяет производителю блокировать и ставить в очередь. Имеет ли это смысл?
 supercat12 мар. 2013 г., 20:09
Я нене вижу никакого тупика с оригинальным постеромкод, потому что элементы могут быть добавлены в очередь в моменты времени, когда каждый потребитель находится за пределами блокировки (в этом случае потребитель получит блокировку в следующий раз, когда он получит блокировку)т пусто, и поэтому оно ненужно ждать) или ждать импульса (в этом случае импульс гарантированно разбудит потребителя).
 DiogoNeves22 мар. 2013 г., 18:46
Спасибо! :) Имеет смысл сейчас
 supercat20 мар. 2013 г., 19:15
Если последний производитель устанавливаетAllDone а затем пульсирует монитор, и еслиConsume метод проверяетAllDone как часть условия цикла while и, увидевAllDone отправляет импульс монитору и затем завершает работу (либо ничего не возвращая, либо выбрасывая исключение), тогда, даже если есть несколько потребителей (которые будут обрабатывать элементы очереди в произвольной последовательности), все ожидающие потребители будут разбужены и получат команду на выход.
 supercat20 мар. 2013 г., 19:13
Monitor.Wait() метод снимает блокировкуждет, пока не начнёт пульсировать какая-то другая нить. Поток потребителя будет заблокирован в пределахConsume метод, но это победилоне мешает производителю кормить его материалом. Самая большая опасность состоит в том, что, если производитель завершит работу, прежде чем сгенерировать все данные, которые ожидает потребитель, потребитель будет ждать вечно вещей, которые никогда не появятся. Это может быть решено, например, имеяAllDone флаг.

Ваш ответ на вопрос