Multiprocessing Queue.get () hängt

Ich versuche, grundlegende Mehrfachverarbeitung zu implementieren, und bin auf ein Problem gestoßen. Das Python-Skript ist unten angehängt.

import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue(10)
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        item = append_queue.get()
        database.append(item)
        print("Appended to database in %.4f seconds" % database.append_time)
        append_queue.task_done()
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_qu,eue(a.get_results())

    append_queue.join()
    #append_queue_process.join()
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

The AnalyzeFrequency analysiert die Häufigkeit von Wörtern in einer Datei undget_results() gibt eine sortierte Liste der Wörter und Häufigkeiten zurück. Die Liste ist sehr groß, vielleicht 10000 Einträge.

Diese Liste wird dann an das @ übergebadd_to_append_queue -Methode, die es zu einer Warteschlange hinzufügt. Die process_append_queue nimmt die Elemente nacheinander und fügt die Häufigkeiten zu einer "Datenbank" hinzu. Dieser Vorgang dauert etwas länger als die eigentliche Analyse inmain() Also versuche ich, einen separaten Prozess für diese Methode zu verwenden. Wenn ich dies mit dem Threading-Modul versuche, funktioniert alles einwandfrei, keine Fehler. Wenn ich Process versuche und verwende, hängt das Skript beiitem = append_queue.get().

Könnte mir bitte jemand erklären, was hier vor sich geht, und mich vielleicht auf eine Lösung hinweisen?

Alle Antworten geschätzt!

AKTUALISIERE

Der Beizfehler war meine Schuld, es war nur ein Tippfehler. Jetzt verwende ich die Queue-Klasse in Multiprocessing, aber die append_queue.get () -Methode bleibt hängen. NEUER COD

import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue()
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        database.append(append_queue.get())
        print("Appended to database in %.4f seconds" % database.append_time)
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    #append_queue.join()
    #append_queue_process.join()
    print str(append_queue.qsize())
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

UPDATE 2

Dies ist der Datenbankcode:

class FrequencyStore:

    def __init__(self):
        self.sorter = Sorter()
        self.db = {}
        self.load_time = -1
        self.save_time = -1
        self.append_time = -1
        self.sort_time = -1

    def load_db(self):
        start_time = time.time()

        try:
            file = open("results.txt", 'r')
        except:
            raise IOError

        self.db = {}
        for line in file:
            word, count = line.strip("\n").split("=")
            self.db[word] = int(count)
        file.close()

        self.load_time = time.time() - start_time

    def save_db(self):
        start_time = time.time()

        _db = []
        for key in self.db:
            _db.append([key, self.db[key]])
        _db = self.sort(_db)

        try:
            file = open("results.txt", 'w')
        except:
            raise IOError

        file.truncate(0)
        for x in _db:
            file.write(x[0] + "=" + str(x[1]) + "\n")
        file.close()

        self.save_time = time.time() - start_time

    def create_sorted_db(self):
        _temp_db = []
        for key in self.db:
            _temp_db.append([key, self.db[key]])
        _temp_db = self.sort(_temp_db)
        _temp_db.reverse()
        return _temp_db

    def get_db(self):
        return self.db

    def sort(self, _list):
        start_time = time.time()

        _list = self.sorter.mergesort(_list)
        _list.reverse()

        self.sort_time = time.time() - start_time
        return _list

    def append(self, _list):
        start_time = time.time()

        for x in _list:
            if x[0] not in self.db:
                self.db[x[0]] = x[1]
            else:
                self.db[x[0]] += x[1]

        self.append_time = time.time() - start_time

Antworten auf die Frage(4)

Ihre Antwort auf die Frage