Выход
документ показывает пример для разделения состояния между процессами, использующими а такжеValue
изArray
библиотека:multiprocessing
из многопроцессорного импорта Process, Value, Array
Будет печатать
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
Мои вопросы
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Как вы продолжаете передавать информацию другому процессу, а не при создании рабочего процесса?
Как вы можете заставить рабочий процесс блокировать (или приостанавливать) ожидание события от родительского процесса через этот механизм?
Моя платформа - Windows 10. Общая память может быть разделена между процессами, но процессы fork () или spawn () не могут наследовать семафор, блокировку, очередь и т. Д.
Благодарю.
[Обновление 1]
Демо, данное @ Manu-Valdés работает. Но я сделал пример, не работает, возможно, вы могли бы помочь определить проблему.
Ошибка
%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os
def f1(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f1({x}) -> {x*x}')
def f2(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f2({x}) -> {x*x}')
def wp_init(q):
#global queue
#queue = q # Point to the global queue in each process
print(f'I am initialized')
def success_cb(result):
print(f'Success returns = {result}')
def failure_cb(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
np = os.cpu_count() # Number of cores per CPU
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))
for x in range(100):
if x % 2 == 0:
f = f1
else:
f = f2
pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)
for x in range(100):
queue.put(x)
# Terminate them but I do not know how to loop through the processes
for _ in range(100):
queue.put(100) # Terminate it
pool.close()
pool.join()
Вы пытаетесь пройти очередь, как только она была запущена. Согласно моему ответу, вы проходите очередь во время инициализации, в вашем случае в multiprocessing.Pool, и затем у вас есть рабочий цикл (который вы не делаете) через элементы очереди. Не вызывайте apply_async и не передавайте очередь: это не удастся.
I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance