¿Cómo hacer que los hilos de trabajo se cierren después de que el trabajo haya terminado en un patrón multiproceso productor-consumidor?

Estoy tratando de implementar un patrón productor-consumidor multiproceso usando Queue.Queue en Python 2.7. Estoy tratando de descubrir cómo hacer que los consumidores, es decir, los hilos de trabajo, se detengan una vez que se haya realizado todo el trabajo requerido.

Vea el segundo comentario de Martin James a esta respuesta:https://stackoverflow.com/a/19369877/1175080

Envíe una tarea "He terminado", indicando a los subprocesos del grupo que finalicen. Cualquier subproceso que tenga una tarea de este tipo lo solicita y luego se suicida.

Pero esto no funciona para mí. Vea el siguiente código, por ejemplo.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            # Requeue the exit indicator.
            q.put(-1)
            # Commit suicide.
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send an exit indicator for all threads to consume.
    q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Este programa se cuelga después de que los tres trabajadores hayan leído el indicador de salida, es decir-1 de la cola, porque cada trabajador solicita-1 antes de salir, por lo que la cola nunca se vacía yq.join() nunca regresa

Se me ocurrió la siguiente pero fea solución donde envío un-1 indicador de salida para cada trabajador a través de la cola, para que cada trabajador pueda verlo y suicidarse. Pero el hecho de que tengo que enviar un indicador de salida para cada trabajador se siente un poco feo.

import Queue
import threading
import time

def worker(n, q):
    # n - Worker ID
    # q - Queue from which to receive data
    while True:
        data = q.get()
        print 'worker', n, 'got', data
        time.sleep(1)  # Simulate noticeable data processing time
        q.task_done()
        if data == -1: # -1 is used to indicate that the worker should stop
            print 'worker', n, 'is exiting'
            break

def master():
    # master() sends data to worker() via q.
    q = Queue.Queue()

    # Create 3 workers.
    for i in range(3):
        t = threading.Thread(target=worker, args=(i, q))
        t.start()

    # Send 10 items to work on.
    for i in range(10):
        q.put(i)
        time.sleep(0.5)

    # Send one stop indicator for each worker.
    for i in range(3):
        q.put(-1)

    print 'waiting for workers to finish ...'
    q.join()
    print 'done'

master()

Tengo dos preguntas.

¿Puede el método de enviar un único indicador de salida para todos los hilos (como se explica en el segundo comentario dehttps://stackoverflow.com/a/19369877/1175080 por Martin James) incluso trabajo?Si la respuesta a la pregunta anterior es "No", ¿hay alguna manera de resolver el problema de manera que no tenga que enviar un indicador de salida por separado para cada hilo de trabajo?

Respuestas a la pregunta(4)

Su respuesta a la pregunta