Python Multiprocessing und Behandlung von Ausnahmen bei Workern
Ich verwende die Python-Multiprozessor-Bibliothek für einen Algorithmus, bei dem viele Mitarbeiter bestimmte Daten verarbeiten und das Ergebnis an den übergeordneten Prozess zurückgeben. Ich benutze multiprocessing.Queue, um Jobs an Arbeiter zu übergeben und zweitens, um Ergebnisse zu sammeln.
Es funktioniert alles ziemlich gut, bis der Mitarbeiter einige Datenblöcke nicht mehr verarbeitet. Im vereinfachten Beispiel unten hat jeder Mitarbeiter zwei Phasen:
Initialisierung - kann fehlschlagen, in diesem Fall sollte der Arbeiter vernichtet werdenDatenverarbeitung - Die Verarbeitung eines Datenblocks kann fehlschlagen. In diesem Fall sollte der Mitarbeiter diesen Block überspringen und mit den nächsten Daten fortfahren.Wenn eine dieser Phasen fehlschlägt, erhalte ich nach Abschluss des Skripts einen Deadlock. Dieser Code simuliert mein Problem:
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()
#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result
#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)
#Wait for them to finish
jobs.join()
Ich habe zwei Fragen zu diesem Code:
Wanninit()
schlägt fehl, wie kann man feststellen, dass der Worker ungültig ist und nicht warten, bis er fertig ist?Wanndo_work()
schlägt fehl, wie kann der übergeordnete Prozess benachrichtigt werden, dass in der Ergebniswarteschlange weniger Ergebnisse zu erwarten sind?Danke für die Hilfe!