Как связать несколько экземпляров подпроцесса в Python 2.7?

У меня есть три команды, которые иначе были бы легко объединены в командной строке следующим образом:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

Другими словами,firstCommand процессыfoo от стандартного ввода и трубы результат кsecondCommandкоторый, в свою очередь, обрабатывает этот вход и передает его выводthirdCommand, который выполняет обработку и перенаправляет свой вывод в файл.finalOutput

Я пытался повторить это в сценарии Python, используя потоки. Я'я хотел бы использовать Python для манипулирования выходнымиfirstCommand прежде чем передать егоsecondCommandи снова междуsecondCommand а также .thirdCommand

Вот'Вот фрагмент кода, который не работает:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

Когда я запускаю это, скрипт зависает:

$ echo foo | ./myConversionScript.py
** hangs here... **

Если я ударилCtrl-C чтобы завершить скрипт, код застрял в строке:third_thread.join()

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in 
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

Если я нене использоватьthird_process а такжеthird_threadвместо того, чтобы только передавать данные с выхода первого потока на вход второго потока, нет зависания.

Кажется, что-то в третьем потоке может привести к поломке, но я нене знаю почему.

Я думал смыслcommunicate() является то, что он будет обрабатывать ввод / вывод для трех процессов, поэтому яЯ не уверен, почему происходит зависание ввода-вывода.

Как получить три или более команд / процессов, работающих вместе, где один поток потребляет вывод другого потока / процесса?

ОБНОВИТЬ

Хорошо, я сделал некоторые изменения, которые, кажется, помогают, основываясь на некоторых комментариях здесь и на других сайтах. Процессы сделаны дляwait() для завершения, и в рамках потоковых методов, яclose() каналы после того, как поток обработал все данные, которые он может. Меня беспокоит то, что использование памяти будет очень большим для больших наборов данных, но, по крайней мере, все работает:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

Ответы на вопрос(2)

Ваш ответ на вопрос