Fila de bloqueio e consumidor multiencadeado, como saber quando parar
Tenho um produtor de thread único que cria alguns objetos de tarefa que são adicionados a umArrayBlockingQueue
(que é de tamanho fixo).
Também inicio um consumidor multiencadeado. Isso é criado como um pool de threads fixo Executors.newFixedThreadPool(threadCount);
). Em seguida, submeto algumas intenções do ConsumerWorker a esse threadPool, cada ConsumerWorker tendo uma referência à instância ArrayBlockingQueue acima mencionada.
Cada trabalhador fará umtake()
na fila e lide com a taref
Meu problema é: qual é a melhor maneira de informar um Trabalhador quando não houver mais trabalho a ser feito. Em outras palavras, como digo aos Trabalhadores que o produtor terminou de adicionar a fila e, a partir deste momento, cada trabalhador deve parar quando perceber que a Fila está vazi
O que eu tenho agora é uma configuração em que meu produtor é inicializado com um retorno de chamada que é acionado quando ele termina seu trabalho (de adicionar coisas à fila). Também mantenho uma lista de todos os ConsumerWorkers que criei e enviei ao ThreadPool. Quando a chamada de retorno do produtor me diz que o produtor está pronto, posso dizer isso a cada um dos trabalhadores. Nesse ponto, eles devem simplesmente verificar se a fila não está vazia e, quando ficar vazia, devem parar, permitindo que eu desligue normalmente o pool de encadeamentos ExecutorService. É algo assim
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
O problema aqui é que eu tenho uma condição de corrida óbvia em que às vezes o produtor termina, sinaliza e os ConsumerWorkers param antes de consumir tudo na fila.
Minha pergunta é qual é a melhor maneira de sincronizar isso para que tudo funcione bem? Devo sincronizar toda a parte em que ele verifica se o produtor está em execução e se a fila está vazia e retira algo da fila em um bloco (no objeto da fila)? Devo apenas sincronizar a atualização doisRunning
booleano na instância do ConsumerWorker? Alguma outra sugestão?
UPDATE, AQUI ESTÁ A IMPLEMENTAÇÃO DE TRABALHO QUE TERMINOU USANDO:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Isso foi fortemente inspirado pela resposta de JohnVint abaixo, com apenas algumas pequenas modificaçõe
=== Atualização devido ao comentário de @ vendhan.
Obrigado pela sua obeservação. Você está certo, o primeiro trecho de código nesta pergunta tem (entre outros problemas) aquele em que owhile(isRunning || !inputQueue.isEmpty())
realmente não faz sentido.
Na minha implementação final atual, faço algo mais próximo da sua sugestão de substituir "||" (ou) com "&&" (e), no sentido de que cada trabalhador (consumidor) agora apenas verifica se o elemento que ele obteve da lista é uma pílula de veneno e, se isso for interrompido (teoricamente, podemos dizer que o trabalhador tem estar em execução E a fila não deve estar vazia