Jak wątkować wiele instancji podprocesów w Pythonie 2.7?
Mam trzy polecenia, które w przeciwnym razie byłyby łatwo połączone w wierszu polecenia w następujący sposób:
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
Innymi słowy,firstCommand
procesyfoo
ze standardowego wejścia i potoku wynik dosecondCommand
, który z kolei przetwarza dane wejściowe i przekazuje dane wyjściowe dothirdCommand
, który przetwarza i przekierowuje swoje wyjście do plikufinalOutput
.
Próbowałem podsumować to w skrypcie Pythona, używając wątków. Chciałbym używać Pythona do manipulowania danymi wyjściowymifirstCommand
przed przekazaniem gosecondCommand
i znowu międzysecondCommand
ithirdCommand
.
Oto fragment kodu, który wydaje się nie działać:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
Kiedy to uruchomię, skrypt zawiesza się:
$ echo foo | ./myConversionScript.py
** hangs here... **
Jeśli trafięCtrl-C
aby zakończyć skrypt, kod utknął na liniithird_thread.join()
:
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in <module>
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
Jeśli nie użyjęthird_process
ithird_thread
zamiast przekazywać dane z wyjścia pierwszego wątku na wejście drugiego wątku, nie ma zawieszenia.
Wydaje się, że coś w trzecim wątku powoduje załamanie, ale nie wiem dlaczego.
Myślałem o tymcommunicate()
jest to, że obsłuży I / O dla trzech procesów, więc nie jestem pewien, dlaczego jest zawieszenie I / O.
Jak uzyskać trzy lub więcej poleceń / procesów pracujących razem, gdzie jeden wątek zużywa dane wyjściowe innego wątku / procesu?
AKTUALIZACJA
OK, zrobiłem kilka zmian, które wydają się pomagać, w oparciu o pewne komentarze tutaj i na innych stronach. Procesy są wykonywanewait()
do uzupełnienia, w ramach metod wątków, Iclose()
potoki po przetworzeniu przez wątek wszystkich danych, które może. Obawiam się, że w przypadku dużych zbiorów danych użycie pamięci będzie bardzo wysokie, ale przynajmniej wszystko działa:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()