Como encerrar os threads de trabalho após o término do trabalho em um padrão produtor-consumidor multithread?
Estou tentando implementar um padrão produtor-consumidor multithread usando Queue.Queue no Python 2.7. Estou tentando descobrir como fazer com que os consumidores, ou seja, os threads de trabalho, parem assim que todo o trabalho necessário estiver concluído.
Veja o segundo comentário de Martin James a esta resposta:https://stackoverflow.com/a/19369877/1175080
Envie uma tarefa 'Concluído', instruindo os threads do pool a serem finalizados. Qualquer thread que recebe essa tarefa a retira da fila e comete suicídio.
Mas isso não funciona para mim. Veja o código a seguir, por exemplo.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Este programa trava depois que os três funcionários leem o indicador de saída, ou seja,-1
da fila, porque cada trabalhador faz fila novamente-1
antes de sair, para que a fila nunca fique vazia eq.join()
nunca retorna.
Eu vim com a seguinte solução, mas feia, onde envio uma-1
indicador de saída de cada trabalhador pela fila, para que cada trabalhador possa vê-lo e cometer suicídio. Mas o fato de eu ter que enviar um indicador de saída para cada trabalhador parece um pouco feio.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
Eu tenho duas perguntas.
Pode o método de envio de um único indicador de saída para todos os threads (conforme explicado no segundo comentário dehttps://stackoverflow.com/a/19369877/1175080 por Martin James) até funciona?Se a resposta para a pergunta anterior for "Não", existe uma maneira de resolver o problema de uma maneira que não precise enviar um indicador de saída separado para cada encadeamento de trabalho?