Как связать несколько экземпляров подпроцесса в Python 2.7?
У меня есть три команды, которые иначе были бы легко объединены в командной строке следующим образом:
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
Другими словами,firstCommand
процессыfoo
от стандартного ввода и трубы результат кsecondCommand
который, в свою очередь, обрабатывает этот вход и передает его выводthirdCommand
, который выполняет обработку и перенаправляет свой вывод в файл.finalOutput
Я пытался повторить это в сценарии Python, используя потоки. Я'я хотел бы использовать Python для манипулирования выходнымиfirstCommand
прежде чем передать егоsecondCommand
и снова междуsecondCommand
а также .thirdCommand
Вот'Вот фрагмент кода, который не работает:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
Когда я запускаю это, скрипт зависает:
$ echo foo | ./myConversionScript.py
** hangs here... **
Если я ударилCtrl-C
чтобы завершить скрипт, код застрял в строке:third_thread.join()
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
Если я нене использоватьthird_process
а такжеthird_thread
вместо того, чтобы только передавать данные с выхода первого потока на вход второго потока, нет зависания.
Кажется, что-то в третьем потоке может привести к поломке, но я нене знаю почему.
Я думал смыслcommunicate()
является то, что он будет обрабатывать ввод / вывод для трех процессов, поэтому яЯ не уверен, почему происходит зависание ввода-вывода.
Как получить три или более команд / процессов, работающих вместе, где один поток потребляет вывод другого потока / процесса?
ОБНОВИТЬ
Хорошо, я сделал некоторые изменения, которые, кажется, помогают, основываясь на некоторых комментариях здесь и на других сайтах. Процессы сделаны дляwait()
для завершения, и в рамках потоковых методов, яclose()
каналы после того, как поток обработал все данные, которые он может. Меня беспокоит то, что использование памяти будет очень большим для больших наборов данных, но, по крайней мере, все работает:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()