Python Multiprocessing und Behandlung von Ausnahmen bei Workern

Ich verwende die Python-Multiprozessor-Bibliothek für einen Algorithmus, bei dem viele Mitarbeiter bestimmte Daten verarbeiten und das Ergebnis an den übergeordneten Prozess zurückgeben. Ich benutze multiprocessing.Queue, um Jobs an Arbeiter zu übergeben und zweitens, um Ergebnisse zu sammeln.

Es funktioniert alles ziemlich gut, bis der Mitarbeiter einige Datenblöcke nicht mehr verarbeitet. Im vereinfachten Beispiel unten hat jeder Mitarbeiter zwei Phasen:

Initialisierung - kann fehlschlagen, in diesem Fall sollte der Arbeiter vernichtet werdenDatenverarbeitung - Die Verarbeitung eines Datenblocks kann fehlschlagen. In diesem Fall sollte der Mitarbeiter diesen Block überspringen und mit den nächsten Daten fortfahren.

Wenn eine dieser Phasen fehlschlägt, erhalte ich nach Abschluss des Skripts einen Deadlock. Dieser Code simuliert mein Problem:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

Ich habe zwei Fragen zu diesem Code:

Wanninit() schlägt fehl, wie kann man feststellen, dass der Worker ungültig ist und nicht warten, bis er fertig ist?Wanndo_work() schlägt fehl, wie kann der übergeordnete Prozess benachrichtigt werden, dass in der Ergebniswarteschlange weniger Ergebnisse zu erwarten sind?

Danke für die Hilfe!

Antworten auf die Frage(1)

Ihre Antwort auf die Frage