Многопроцессорная обработка Python и обработка исключений в рабочих

Я использую многопроцессорную библиотеку Python для алгоритма, в котором у меня много работников, обрабатывающих определенные данные и возвращающих результат в родительский процесс. Я использую multiprocessing.Queue для передачи рабочих мест работникам, а во-вторых, для сбора результатов.

Все это работает довольно хорошо, пока рабочий не может обработать часть данных. В приведенном ниже упрощенном примере каждый работник имеет две фазы:

инициализация - может произойти сбой, в этом случае рабочий должен быть уничтоженобработка данных - обработка фрагмента данных может завершиться сбоем, в этом случае рабочий должен пропустить этот фрагмент и перейти к следующим данным.

Когда любой из этих этапов не удается, я получаю тупик после завершения сценария. Этот код имитирует мою проблему:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

У меня есть два вопроса об этом коде:

когдаinit() не удается, как обнаружить, что работник недействителен и не ждать его завершения?когдаdo_work() не удается, как уведомить родительский процесс о том, что в очереди результатов следует ожидать меньшего количества результатов?

Спасибо за помощь!

Ответы на вопрос(1)

Решение Вопроса

Я немного изменил ваш код, чтобы он работал (см. Пояснение ниже).

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

Мои изменения:

Измененоjobs быть просто нормальнымQueue (вместоJoinableQueue).Рабочие теперь общаются обратно со специальными результатамиОшибка инициализации а также "работа не удалась ".Главный процесс отслеживает указанные специальные результаты, пока действуют конкретные условия.В итоге ставьстоп" запросы (т.е.None рабочих мест) для любого количества рабочих, независимо от того, Обратите внимание, что не все из них могут быть извлечены из очереди (в случае, если рабочий не смог инициализироваться).

Кстати, ваш оригинальный код был приятным и с ним легко работать. Бит случайных вероятностей довольно крутой.

 jfs05 июн. 2013 г., 20:10
или вы могли бы поставить кортеж(result, error) (ошибка - Нет в случае успеха) в очередь результатов, чтобы избежать внутриполосной связи для ошибок.

Ваш ответ на вопрос