Zrozumienie Multiprocessing: Shared Memory Management, Locks and Queues w Pythonie

Multiprocessing jest potężnym narzędziem w Pythonie i chcę go głębiej zrozumieć. Chcę wiedzieć, kiedy używaćregularny Zamki iKolejki i kiedy używać wieloprocesorowościMenedżer dzielić się nimi między wszystkie procesy.

Wymyśliłem następujące scenariusze testowe z czterema różnymi warunkami dla wieloprocesowości:

Korzystanie z basenu iNIE Menedżer

Korzystanie z basenu i menedżera

Korzystanie z poszczególnych procesów iNIE Menedżer

Korzystanie z indywidualnych procesów i menedżera

Praca

Wszystkie warunki wykonują funkcję zadaniathe_job. the_job składa się z drukowania, które jest zabezpieczone zamkiem. Co więcej, wejście do funkcji jest po prostu umieszczane w kolejce (aby sprawdzić, czy można ją odzyskać z kolejki). To wejście jest po prostu indeksemidx zrange(10) utworzony w głównym skrypcie o nazwiestart_scenario (pokazane na dole).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

Powodzenie warunku jest zdefiniowane jako doskonale przywołujące dane wejściowe z kolejki, zobacz funkcjęread_queue na dnie.

Warunki

Warunek 1 i 2 nie wymagają wyjaśnień. Warunek 1 obejmuje utworzenie blokady i kolejki oraz przekazanie ich do puli procesów:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(Funkcja pomocniczamake_iterator znajduje się na dole tego wpisu.) Warunki 1 nie działająRuntimeError: Lock objects should only be shared between processes through inheritance.

Warunek 2 jest raczej podobny, ale teraz blokada i kolejka są pod nadzorem kierownika:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

W warunku 3 nowe procesy są uruchamiane ręcznie, a blokada i kolejka są tworzone bez menedżera:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

Warunek 4 jest podobny, ale teraz ponownie za pomocą menedżera:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

W obu warunkach - 3 i 4 - rozpoczynam nowy proces dla każdego z 10 zadańthe_job z co najwyżejwyniki procesy działające w tym samym czasie. Osiąga się to dzięki następującej funkcji pomocnika:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)
Wynik

Niepowodzenie tylko warunku 1 (RuntimeError: Lock objects should only be shared between processes through inheritance), podczas gdy pozostałe 3 warunki odnoszą sukces. Staram się owinąć głowę wokół tego wyniku.

Dlaczego pula musi współdzielić blokadę i kolejkę między wszystkimi procesami, ale poszczególne procesy z warunku 3 nie?

Wiem, że dla warunków puli (1 i 2) wszystkie dane z iteratorów są przekazywane przez wytrawianie, podczas gdy w warunkach pojedynczego procesu (3 i 4) wszystkie dane z iteratorów są przekazywane przez dziedziczenie z głównego procesu (jestem za pomocąLinux). Chyba dopóki pamięć nie zostanie zmieniona w ramach procesu potomnego, dostęp do tej samej pamięci, z której korzysta proces rodzicielski (copy-on-write). Ale jak tylko powieszlock.acquire(), należy to zmienić, a procesy potomne używają różnych blokad umieszczonych gdzieś w pamięci, prawda? W jaki sposób jeden proces potomny wie, że brat aktywował blokadę, która nie jest udostępniana przez menedżera?

Wreszcie, nieco pokrewne jest moje pytanie, jak różne są warunki 3 i 4. Oba mają indywidualne procesy, ale różnią się wykorzystaniem menedżera. Oba są uważane zaważny kod? A może należy unikać korzystania z menedżera, jeśli nie ma takiej potrzeby?

Pełny skrypt

Dla tych, którzy po prostu chcą skopiować i wkleić wszystko, aby wykonać kod, oto pełny skrypt:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

questionAnswers(1)

yourAnswerToTheQuestion