Ждите и уведомляйте в Потоке Потребителя и Производителя

Только начал изучать многопоточность. У меня 5 производителей и 2 потребителя в нескольких потоках. В основном эта программа добавляет 100 элементов в очередь. Производитель прекращает добавление, когда размер очереди равен 100. Я бы хотел, чтобы потребитель уведомил производителя, когда потребитель удалит все элементы из очереди, чтобы производитель мог начать добавление снова. В настоящее время производитель будет ждать, но никогда не получит уведомление от потребителя.

Режиссер:

public class Producer implements Runnable {

private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();

  public Producer(BlockingQueue sharedQueue, int queueSize){
    this.sharedQueue = sharedQueue;
    this.queueSize = queueSize;
  }

  public void run() {
    while(true) {
        if(sharedQueue.size()== queueSize){

                try {
                    synchronized (lock) {
                    sharedQueue.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
         }

        try {
            sharedQueue.put("Producer: " + sharedQueue.size());
            Thread.sleep(500);
            System.out.println("Producer:  Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

потребитель:

public class Consumer implements Runnable{

private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();

   public Consumer(BlockingQueue sharedQueue, int queueSize){
    this.sharedQueue = sharedQueue;
    this.queueSize = queueSize;
   }
//Notify awaiting thread if the sharedQueue is empty
   public void run() {
    while (true) {
        if(sharedQueue.size()==queueEmpty){
            synchronized (lock) {
            this.notifyAll();
            }
        }
            try {

                    sharedQueue.take();
                    Thread.sleep(800);
                    System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());

            }catch(InterruptedException e){
                e.printStackTrace();
            }
    }

  }
}

Основной класс

  public class App{ 

//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
    final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
    final int queueSize =100;
    final int producerNum = 5;
    final int consumerNum = 2;

    final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
    final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);

    for(int i=0;i<producerNum;i++){
        Producer producer = new Producer(sharedQueue,queueSize);
        executorProducer.execute(producer);
    }

    for(int j=0;j<consumerNum;j++){
        Consumer consumer = new Consumer(sharedQueue,queueSize);
        executorConsumer.execute(consumer);
    }



   }
 }
 hao07 июн. 2016 г., 17:52
@ Натан точно ...
 hao07 июн. 2016 г., 18:28
@ Натан Я сделал несколько изменений в коде. Это все еще показывает то же самое сообщение об ошибке. Кажется, что установка синхронизированного блока вокруг wait и notifyall не решает проблему ...
 Nathan Hughes07 июн. 2016 г., 17:50
этот код должен генерировать IllegalMonitorStateException, если вызывается notifyAll.
 Nathan Hughes07 июн. 2016 г., 17:57
Здесь есть несколько ошибок. Чтение руководства по Oracle на защищенных блоках должно чему-то вас научить.
 Nathan Hughes07 июн. 2016 г., 18:33
Я сказал вам, что есть несколько проблем. прочитайте урок оракула:docs.oracle.com/javase/tutorial/essential/concurrency/...
 hao07 июн. 2016 г., 18:08
Для примеров защищенных блоков методы wait и notifyAll вызываются в объекте, который обновляется как производителем, так и потребителем. Также они переключают логическую переменную для запуска цикла while. Я понимаю этот пример, но когда я вызываю sharedQueue.wait () или sharedQueue.notifyAll (), я получу исключение нелегальноеMonitorStateException
 Vijay07 июн. 2016 г., 18:40
вам не нужно ждать / уведомлять, когда вы используете очередь блокировки, так как очередь будет заблокирована, когда она достигнет предела для производителя и для потребителя, когда в очереди больше не осталось задач.
 hao07 июн. 2016 г., 17:52
Я думаю, что notifyAll разбудит все потоки, которые ожидают этого объекта. Пробужденные потоки не смогут продолжить работу, пока текущий поток не снимет блокировку этого объекта. Итак, моя ошибка в том, что ждать и уведомлять нужно на одном и том же объекте?
 Sotirios Delimanolis07 июн. 2016 г., 17:49
Как вы думаетеthis.notifyAll(); делает? Почему ты так думаешь?

Ответы на вопрос(1)

Решение Вопроса

страница:

Реализации BlockingQueue являются поточно-ориентированными. Все методы очередей достигают своих эффектов атомарно, используя внутренние блокировки или другие формы управления параллелизмом.

Поскольку вы уже используетеBlockingQueuesможно избавиться отwait() а такжеnotify() API-интерфейсы.

Пример кода для нескольких производителей и потребителей, использующихBlockingQueue:

import java.util.concurrent.*;

public class ProducerConsumerDemo {

    public static void main(String args[]){

     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

      Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
      Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
      Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
      Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

      prodThread1.start();
      prodThread2.start();
      consThread1.start();
      consThread2.start(); 
   }

}

class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Как это работает?

Продюсер темы 1 помещает целые числа в диапазоне от 11 до 15 вBlockingQueueПродюсер темы 2 помещает целые числа в диапазоне от 21 до 25 вBlockingQueueЛюбой из потребительских потоков - поток 1 или поток 2 считывает значения изBlockingQueue (Целое число в этом примере)

Пример вывода:

Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1

Ваш ответ на вопрос