Wie führe ich geschachtelte, hierarchische Pathos Multiprocessing Maps aus?

Nachdem ich einen wesentlichen Teil meines Codes für die Dill-Serialisierung / Beizung erstellt habe, versuche ich auch, Pathos Multiprocessing zu verwenden, um meine Berechnungen zu parallelisieren. Pathos ist eine natürliche Erweiterung des Dills.

Wenn Sie versuchen, verschachtelt auszuführen

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

inside an otherProcessingPool().map, dann erhalte ich:

AssertionError: daemonic processes are not allowed to have children

Z.B.

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

ergibt

AssertionError: daemonic processes are not allowed to have children

Ich habe versucht mitamap(...).get() ohne Erfolg. Dies ist auf Pathos 0.2.0.

Was ist der beste Weg, um geschachtelte Parallelisierung zu ermöglichen?

Aktualisiere

Ich muss an dieser Stelle ehrlich sein und gestehen, dass ich die Behauptung entfernt habe"daemonic processes are not allowed to have children" von Pathos. Ich habe auch etwas gebaut, das @ kaskadieKeyboardInterrupt Arbeiter und Arbeiter von denen ... Teile der Lösung unten:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

Funktioniert anscheinend mit Konsole und IPython-Notebook (mit Stopp-Taste), ist jedoch nicht sicher, ob es in allen Eckfällen 100% korrekt ist.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage