Wie alle pool.apply_async-Prozesse angehalten werden, sobald ein Prozess eine Übereinstimmung in python gefunden hat

Ich habe den folgenden Code, der Multiprocessing nutzt, um eine große Liste zu durchlaufen und eine Übereinstimmung zu finden. Wie kann ich alle Prozesse stoppen, sobald eine Übereinstimmung in einem der Prozesse gefunden wurde? Ich habe Beispiele gesehen, aber keine davon scheint in das zu passen, was ich hier tue.

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts

def do_job(first_bits):
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        # CHECK FOR MATCH HERE
        print(''.join(x))
        # EXIT ALL PROCESSES IF MATCH FOUND

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = []

    for i in range(num_parts):
        if i == num_parts - 1:
            first_bit = alphabet[part_size * i :]
        else:
            first_bit = alphabet[part_size * i : part_size * (i+1)]
        pool.apply_async(do_job, (first_bit,))

    pool.close()
    pool.join()

Vielen Dank für Ihre Zeit

UPDATE 1:

Ich habe die Änderungen implementiert, die in dem großartigen Ansatz von @ShadowRanger vorgeschlagen wurden, und es funktioniert fast so, wie ich es mir wünsche. Deshalb habe ich einige Protokollierungen hinzugefügt, um einen Hinweis auf den Fortschritt zu geben, und einen entsprechenden 'Test'-Schlüssel eingefügt. Ich möchte die iNumberOfProcessors unabhängig von den num_parts erhöhen / verringern können. In diesem Stadium, wenn ich beide bei 4 habe, funktioniert alles wie erwartet, 4 Prozesse drehen sich (einer extra für die Konsole). Wenn ich die iNumberOfProcessors = 6 ändere, werden 6 Prozesse hochgefahren, von denen jedoch nur einige eine CPU-Auslastung aufweisen. Es sieht also so aus, als wären 2 im Leerlauf. Wo ich wie in meiner vorherigen Lösung oben die Anzahl der Kerne erhöhen konnte, ohne die Anzahl der Teile zu erhöhen, und alle Prozesse verwendet wurden.

Ich bin mir nicht sicher, wie ich diesen neuen Ansatz umgestalten soll, um die gleiche Funktionalität zu erhalten. Können Sie einen Blick darauf werfen und mir eine Anleitung für das Refactoring geben, das erforderlich ist, damit iNumberOfProcessors und num_parts unabhängig voneinander festgelegt werden können und alle Prozesse weiterhin verwendet werden?

Hier ist der aktualisierte Code:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6

def do_job(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

if __name__ == '__main__':
    # with statement with Py3 multiprocessing.Pool terminates when block exits
    with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:

        # Don't need special case for final block; slices can 
        for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
             if gotmatch:
                 break
        else:
             print("No matches found")

UPDATE 2:

Ok hier ist mein Versuch, @noxdafox Vorschlag zu versuchen. Ich habe das Folgende auf der Grundlage des Links zusammengestellt, den er mit seinem Vorschlag bereitgestellt hat. Leider bekomme ich beim Ausführen den Fehler:

... Zeile 322, in apply_async erhöhe ValueError ("Pool läuft nicht") ValueError: Pool läuft nicht

Kann mir jemand eine Anleitung geben, wie ich das zum Laufen bringen kann.

rundsätzlich ist das Problem, dass mein erster Versuch eine Mehrfachverarbeitung durchführte, aber nicht das Abbrechen aller Prozesse unterstützte, sobald eine Übereinstimmung gefunden wurd

Mein zweiter Versuch (basierend auf dem @ ShadowRanger-Vorschlag) löste dieses Problem, unterbrach jedoch die Funktionalität, die Anzahl der Prozesse und die Anzahl der Teile unabhängig voneinander skalieren zu können. Dies ist etwas, was mein erster Versuch tun könnte.

Mein dritter Versuch (basierend auf dem @noxdafox-Vorschlag) löst den oben beschriebenen Fehler aus.

Wenn mir jemand eine Anleitung geben kann, wie ich die Funktionalität meines ersten Versuchs aufrechterhalten kann (indem ich die Anzahl der Prozesse und die Anzahl der Teile unabhängig voneinander skalieren kann), und die Funktionalität hinzufügen kann, alle Prozesse abzubrechen, sobald eine Übereinstimmung gefunden wurde, wäre das sehr viel geschätzt.

Vielen Dank für Ihre Zeit

Hier ist der Code aus meinem dritten Versuch, der auf dem Vorschlag von @noxdafox basiert:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4


def find_match(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

class Worker():

    def __init__(self, workers):
        self.workers = workers

    def callback(self, result):
        if result:
            self.pool.terminate()

    def do_job(self):
        print(self.workers)
        pool = multiprocessing.Pool(processes=self.workers)
        for part in grouper(alphabet, part_size):
            pool.apply_async(do_job, (part,), callback=self.callback)
        pool.close()
        pool.join()
        print("All Jobs Queued")

if __name__ == '__main__':
    w = Worker(4)
    w.do_job()

Antworten auf die Frage(4)

Ihre Antwort auf die Frage