Как я могу восстановить возвращаемое значение функции, переданной multiprocessing.Process?

В приведенном ниже примере кода я хочу восстановить возвращаемое значение функцииworker, Как я могу сделать это? Где хранится это значение?

Example Code:

<code>import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs
</code>

Output:

<code>0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
</code>

Я не могу найти соответствующий атрибут в объектах, хранящихся вjobs.

Заранее спасибо, БИК

Ответы на вопрос(9)

что подход, предложенный @sega_sai, лучше. Но это действительно нуждается в примере кода, так что здесь идет:

import multiprocessing
from os import getpid

def worker(procnum):
    print 'I am number %d in process %d' % (procnum, getpid())
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print pool.map(worker, range(5))

Который будет печатать возвращаемые значения:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Если вы знакомы сmap (встроенный в Python 2) это не должно быть слишком сложным. В противном случае взгляните нассылка sega_Sai.

Обратите внимание, как мало кода требуется. (Также обратите внимание, как процессы используются повторно).

 31 окт. 2016 г., 20:00
Я также думал, что это было связано со скоростью, но когда я кормлюpool.map диапазон 1000000 с использованием более 10 процессов, я вижу не более двух разных пидов.
 29 окт. 2016 г., 19:39
Любые идеи, почему мойgetpid() вернуть все то же значение? Я использую Python3
 05 нояб. 2016 г., 00:04
Здесь & APOS; smy question
 01 нояб. 2016 г., 12:27
Тогда я не уверен. Я думаю, что было бы интересно открыть для этого отдельный вопрос.
 31 окт. 2016 г., 16:30
Я не уверен, как Pool распределяет задачи по работникам. Может быть, все они могут оказаться на одном и том же работнике, если они действительно быстрые? Это происходит последовательно? Также, если вы добавите задержку?

Этот пример показывает, как использовать списокmultiprocessing.Pipe экземпляры для возврата строк из произвольного числа процессов:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Это решение использует меньше ресурсов, чемmultiprocessing.Queue который использует

a Pipe at least one Lock a buffer a thread

илиmultiprocessing.SimpleQueue который использует

a Pipe at least one Lock

Очень поучительно посмотреть на источник для каждого из этих типов.

 25 окт. 2016 г., 16:56
всегда ли должен читаться канал, прежде чем к нему можно будет добавить (отправить) новое значение?
 25 окт. 2016 г., 15:15
Что было бы лучшим способом сделать это без превращения каналов в глобальную переменную?
 25 окт. 2016 г., 15:43
Я помещаю все глобальные данные и код в основную функцию, и она работает так же. Это отвечает на ваш вопрос?
 21 сент. 2017 г., 22:41
+1, хороший ответ. Но из-за того, что решение более эффективно, компромисс заключается в том, что вы делаетеPipe за процесс против одногоQueue for all processes. I don't know if that ends up being more efficient in all cases. – sudo Sep 21 '17 at 20:41

Кажется, вы должны использоватьmultiprocessing.Pool вместо этого используйте классы .apply () .apply_async (), map ()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

как это сделать сQueue в любом месте (даже примеры документов Python не порождают несколько процессов), так что вот что я получил после 10 попыток:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue это блокирующая потокобезопасная очередь, которую вы можете использовать для хранения возвращаемых значений от дочерних процессов. Таким образом, вы должны передать очередь каждому процессу. Что-то менее очевидное в том, что вы должныget() из очереди перед вамиjoin Processили очередь заполняется и блокирует все.

Update для тех, кто является объектно-ориентированным (протестировано в Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

Вы можете использоватьexit встроенный, чтобы установить код выхода процесса. Его можно получить изexitcode атрибут процесса:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
 19 июл. 2018 г., 19:45
Идеально, если вы просто хотите вызвать исключение в родительском процессе при ошибке.
 23 мая 2017 г., 23:50
Имейте в виду, что такой подход может привести к путанице. Обычно процессы должны завершаться с кодом завершения 0, если они завершены без ошибок. Если у вас есть что-нибудь, отслеживающее коды завершения процесса вашей системы, вы можете увидеть эти сообщения как ошибки.

Для тех, кто ищет, как получить ценность отProcess с помощьюQueue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
 10 окт. 2016 г., 10:11
Да, он висит там бесконечно. Все мои работники заканчивают (цикл внутри рабочей функции завершается, после этого печатается оператор print для всех работников). Объединение ничего не делает. Если я удалюQueue от моей функции это позволяет мне пройтиjoin()
 06 окт. 2016 г., 19:44
@LaurensKoppenol Вы имеете в виду, что ваш основной код постоянно висит в p.join () и никогда не продолжается? Ваш процесс имеет бесконечный цикл?
 16 авг. 2017 г., 04:47
@ LaurensKoppenol Вы, возможно, не звонитеqueue.put(ret) до звонкаp.start() ? В этом случае рабочий поток будет висеть наqueue.get() навсегда. Вы можете повторить это, скопировав мой фрагмент выше при комментированииqueue.put(ret).
 06 окт. 2016 г., 14:30
когда я помещаю что-то в очередь в моем рабочем процессе, мое соединение никогда не достигается. Любая идея, как это может прийти?
 16 нояб. 2017 г., 20:24
Я отредактировал этот ответ,queue.get() должно произойти доp.join(), Это работает сейчас для меня.

Простое решение:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Выход:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

так как мне нужно было получить коды ошибок из функции. (Спасибо Vertec !!! это удивительный трюк)

Это также можно сделать с помощьюmanager.list но я думаю, что лучше иметь это в диктовке и хранить список в нем. Таким образом, мы сохраняем функцию и результаты, поскольку не можем быть уверены в том, в каком порядке будет заполняться список.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
Решение Вопроса

использованиеобщая переменная общаться. Например, вот так:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
 29 сент. 2016 г., 13:08
@dano: Интересно, если мы используем объект Queue (), мы не можем определить порядок, когда каждый процесс возвращает значение. Я имею в виду, если нам нужен порядок в результате, чтобы сделать следующую работу. Как мы можем быть уверены, где именно, какой вывод от какого процесса
 19 апр. 2015 г., 02:54
Я бы порекомендовал использоватьmultiprocessing.Queue, а неManager Вот. ИспользуяManager требует порождения совершенно нового процесса, который является излишним, когдаQueue сделал бы.
 11 июл. 2017 г., 19:01
это прекрасно работает для одновременного запуска нескольких функций и хитрости для получения информации после. знак равно
 01 дек. 2016 г., 15:43
@Catbuilts Вы можете возвратить кортеж из каждого процесса, где одно значение является фактическим возвращаемым значением, о котором вы заботитесь, а другое - уникальным идентификатором из процесса. Но мне также интересно, почему вы должны знать, какой процесс возвращает какое значение. Если это то, что вам на самом деле нужно знать о процессе, или вам нужно соотнести ваш список входов и список выходов? В этом случае я бы рекомендовал использоватьmultiprocessing.Pool.map обработать ваш список рабочих элементов.

Ваш ответ на вопрос