Grundlegendes zu Multiprozessoren: Shared Memory Management, Sperren und Warteschlangen in Python

Multiprozessing ist ein mächtiges Werkzeug in Python, und ich möchte es genauer verstehen. Ich möchte wissen, wann ich es verwenden sollregulär Schlösser undWarteschlangen und wann eine Mehrfachverarbeitung verwendet werden sollManager diese unter allen Prozessen zu teilen.

Ich habe die folgenden Testszenarien mit vier verschiedenen Bedingungen für die Mehrfachverarbeitung entwickelt:

Mit einem Pool undNEIN Manager

Verwenden eines Pools und eines Managers

Mit individuellen Prozessen undNEIN Manager

Verwendung einzelner Prozesse und eines Managers

Die Arbeit

Alle Bedingungen führen eine Jobfunktion austhe_job. the_job besteht aus etwas Druck, der durch ein Schloss gesichert ist. Außerdem wird die Eingabe in die Funktion einfach in eine Warteschlange gestellt (um zu sehen, ob sie aus der Warteschlange wiederhergestellt werden kann). Diese Eingabe ist einfach ein Indexidx vonrange(10) erstellt im Hauptskript aufgerufenstart_scenario (unten gezeigt).

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)

Der Erfolg einer Bedingung ist definiert als perfektes Abrufen der Eingabe aus der Warteschlange, siehe Funktionread_queue unten.

Die Voraussetzungen

Bedingung 1 und 2 sind eher selbsterklärend. Bedingung 1 besteht darin, eine Sperre und eine Warteschlange zu erstellen und diese an einen Prozesspool zu übergeben:

def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.imap(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)

(Die Hilfsfunktionmake_iterator wird am Ende dieses Beitrags angegeben.) Bedingung 1 scheitert mitRuntimeError: Lock objects should only be shared between processes through inheritance.

Bedingung 2 ist ziemlich ähnlich, aber jetzt werden die Sperre und die Warteschlange von einem Manager überwacht:

def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.imap(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)

In Bedingung 3 werden neue Prozesse manuell gestartet und die Sperre und die Warteschlange werden ohne einen Manager erstellt:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

Bedingung 4 ist ähnlich, verwendet jetzt aber wieder einen Manager:

def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)

In beiden Fällen - 3 und 4 - starte ich für jede der 10 Aufgaben vonthe_job mit höchstensNcores Prozesse arbeiten zur gleichen Zeit. Dies wird mit folgender Hilfsfunktion erreicht:

def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)
Das Ergebnis

Nur Bedingung 1 schlägt fehl (RuntimeError: Lock objects should only be shared between processes through inheritance) während die anderen 3 Bedingungen erfolgreich sind. Ich versuche, meinen Kopf um dieses Ergebnis zu wickeln.

Warum muss der Pool eine Sperre und eine Warteschlange für alle Prozesse gemeinsam nutzen, die einzelnen Prozesse aus Bedingung 3 jedoch nicht?

Was ich weiß, ist, dass für die Poolbedingungen (1 und 2) alle Daten von den Iteratoren durch Beizen übergeben werden, während in den Einzelprozessbedingungen (3 und 4) alle Daten von den Iteratoren durch Vererbung vom Hauptprozess übergeben werden (I am mitLinux). Ich denke, bis der Speicher innerhalb eines untergeordneten Prozesses geändert wird, wird auf denselben Speicher zugegriffen, den der übergeordnete Prozess verwendet (Copy-on-Write). Aber sobald man sagtlock.acquire()Sollte dies geändert werden und die untergeordneten Prozesse verwenden unterschiedliche Sperren, die sich an einer anderen Stelle im Speicher befinden, nicht wahr? Woher weiß ein untergeordneter Prozess, dass ein Bruder eine Sperre aktiviert hat, die nicht über einen Manager freigegeben wurde?

Schließlich ist meine Frage etwas verwandt, wie sehr sich die Bedingungen 3 und 4 unterscheiden. Beide haben individuelle Prozesse, unterscheiden sich jedoch in der Verwendung eines Managers. Werden beide alsgültig Code? Oder sollte man auf einen Manager verzichten, wenn man eigentlich keinen braucht?

Vollständiges Skript

Für diejenigen, die einfach alles kopieren und einfügen möchten, um den Code auszuführen, ist hier das vollständige Skript:

__author__ = 'Me and myself'

import multiprocessing as mp
import time

def the_job(args):
    """The job for multiprocessing.

    Prints some stuff secured by a lock and 
    finally puts the input into a queue.

    """
    idx = args[0]
    lock = args[1]
    queue=args[2]

    lock.acquire()
    print 'I'
    print 'was '
    print 'here '
    print '!!!!'
    print '1111'
    print 'einhundertelfzigelf\n'
    who= ' By run %d \n' % idx
    print who
    lock.release()

    queue.put(idx)


def read_queue(queue):
    """Turns a qeue into a normal python list."""
    results = []
    while not queue.empty():
        result = queue.get()
        results.append(result)
    return results


def make_iterator(args, lock, queue):
    """Makes an iterator over args and passes the lock an queue to each element."""
    return ((arg, lock, queue) for arg in args)


def start_scenario(scenario_number = 1):
    """Starts one of four multiprocessing scenarios.

    :param scenario_number: Index of scenario, 1 to 4

    """
    args = range(10)
    ncores = 3
    if scenario_number==1:
        result =  scenario_1_pool_no_manager(the_job, args, ncores)

    elif scenario_number==2:
        result =  scenario_2_pool_manager(the_job, args, ncores)

    elif scenario_number==3:
        result =  scenario_3_single_processes_no_manager(the_job, args, ncores)

    elif scenario_number==4:
        result =  scenario_4_single_processes_manager(the_job, args, ncores)

    if result != args:
        print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
    else:
        print 'Scenario %d successful!' % scenario_number


def scenario_1_pool_no_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITHOUT a Manager for the lock and queue.

    FAILS!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    mypool.map(jobfunc, iterator)

    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_2_pool_manager(jobfunc, args, ncores):
    """Runs a pool of processes WITH a Manager for the lock and queue.

    SUCCESSFUL!

    """
    mypool = mp.Pool(ncores)
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)
    mypool.map(jobfunc, iterator)
    mypool.close()
    mypool.join()

    return read_queue(queue)


def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITHOUT a Manager,

    SUCCESSFUL!

    """
    lock = mp.Lock()
    queue = mp.Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def scenario_4_single_processes_manager(jobfunc, args, ncores):
    """Runs an individual process for every task WITH a Manager,

    SUCCESSFUL!

    """
    lock = mp.Manager().Lock()
    queue = mp.Manager().Queue()

    iterator = make_iterator(args, lock, queue)

    do_job_single_processes(jobfunc, iterator, ncores)

    return read_queue(queue)


def do_job_single_processes(jobfunc, iterator, ncores):
    """Runs a job function by starting individual processes for every task.

    At most `ncores` processes operate at the same time

    :param jobfunc: Job to do

    :param iterator:

        Iterator over different parameter settings,
        contains a lock and a queue

    :param ncores:

        Number of processes operating at the same time

    """
    keep_running=True
    process_dict = {} # Dict containing all subprocees

    while len(process_dict)>0 or keep_running:

        terminated_procs_pids = []
        # First check if some processes did finish their job
        for pid, proc in process_dict.iteritems():

            # Remember the terminated processes
            if not proc.is_alive():
                terminated_procs_pids.append(pid)

        # And delete these from the process dict
        for terminated_proc in terminated_procs_pids:
            process_dict.pop(terminated_proc)

        # If we have less active processes than ncores and there is still
        # a job to do, add another process
        if len(process_dict) < ncores and keep_running:
            try:
                task = iterator.next()
                proc = mp.Process(target=jobfunc,
                                                   args=(task,))
                proc.start()
                process_dict[proc.pid]=proc
            except StopIteration:
                # All tasks have been started
                keep_running=False

        time.sleep(0.1)


def main():
    """Runs 1 out of 4 different multiprocessing scenarios"""
    start_scenario(1)


if __name__ == '__main__':
    main()

Antworten auf die Frage(1)

Ihre Antwort auf die Frage