Jak wątkować wiele instancji podprocesów w Pythonie 2.7?

Mam trzy polecenia, które w przeciwnym razie byłyby łatwo połączone w wierszu polecenia w następujący sposób:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

Innymi słowy,firstCommand procesyfoo ze standardowego wejścia i potoku wynik dosecondCommand, który z kolei przetwarza dane wejściowe i przekazuje dane wyjściowe dothirdCommand, który przetwarza i przekierowuje swoje wyjście do plikufinalOutput.

Próbowałem podsumować to w skrypcie Pythona, używając wątków. Chciałbym używać Pythona do manipulowania danymi wyjściowymifirstCommand przed przekazaniem gosecondCommandi znowu międzysecondCommand ithirdCommand.

Oto fragment kodu, który wydaje się nie działać:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

Kiedy to uruchomię, skrypt zawiesza się:

$ echo foo | ./myConversionScript.py
** hangs here... **

Jeśli trafięCtrl-C aby zakończyć skrypt, kod utknął na liniithird_thread.join():

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

Jeśli nie użyjęthird_process ithird_threadzamiast przekazywać dane z wyjścia pierwszego wątku na wejście drugiego wątku, nie ma zawieszenia.

Wydaje się, że coś w trzecim wątku powoduje załamanie, ale nie wiem dlaczego.

Myślałem o tymcommunicate() jest to, że obsłuży I / O dla trzech procesów, więc nie jestem pewien, dlaczego jest zawieszenie I / O.

Jak uzyskać trzy lub więcej poleceń / procesów pracujących razem, gdzie jeden wątek zużywa dane wyjściowe innego wątku / procesu?

AKTUALIZACJA

OK, zrobiłem kilka zmian, które wydają się pomagać, w oparciu o pewne komentarze tutaj i na innych stronach. Procesy są wykonywanewait() do uzupełnienia, w ramach metod wątków, Iclose() potoki po przetworzeniu przez wątek wszystkich danych, które może. Obawiam się, że w przypadku dużych zbiorów danych użycie pamięci będzie bardzo wysokie, ale przynajmniej wszystko działa:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

questionAnswers(2)

yourAnswerToTheQuestion