Wie führe ich geschachtelte, hierarchische Pathos Multiprocessing Maps aus?
Nachdem ich einen wesentlichen Teil meines Codes für die Dill-Serialisierung / Beizung erstellt habe, versuche ich auch, Pathos Multiprocessing zu verwenden, um meine Berechnungen zu parallelisieren. Pathos ist eine natürliche Erweiterung des Dills.
Wenn Sie versuchen, verschachtelt auszuführen
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
inside an otherProcessingPool().map
, dann erhalte ich:
AssertionError: daemonic processes are not allowed to have children
Z.B.
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
ergibt
AssertionError: daemonic processes are not allowed to have children
Ich habe versucht mitamap(...).get()
ohne Erfolg. Dies ist auf Pathos 0.2.0.
Was ist der beste Weg, um geschachtelte Parallelisierung zu ermöglichen?
Aktualisiere
Ich muss an dieser Stelle ehrlich sein und gestehen, dass ich die Behauptung entfernt habe"daemonic processes are not allowed to have children"
von Pathos. Ich habe auch etwas gebaut, das @ kaskadieKeyboardInterrupt
Arbeiter und Arbeiter von denen ... Teile der Lösung unten:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
Funktioniert anscheinend mit Konsole und IPython-Notebook (mit Stopp-Taste), ist jedoch nicht sicher, ob es in allen Eckfällen 100% korrekt ist.