¿Cómo enlazar múltiples instancias de subproceso en Python 2.7?

Tengo tres comandos que de otra manera serían fácilmente encadenados juntos en la línea de comandos así:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

En otras palabras, lafirstCommand procesosfoo De entrada estándar y canaliza el resultado asecondCommand, que a su vez procesa esa entrada y canaliza su salida athirdCommand, que procesa y redirige su salida al archivo.finalOutput.

He estado tratando de recapitular esto en un script de Python, usando subprocesos. Me gustaría usar Python para manipular la salida defirstCommand antes de pasarlo asecondCommand, y otra vez entresecondCommand ythirdCommand.

Aquí hay un extracto de código que no parece funcionar:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

Cuando ejecuto esto, el script cuelga:

$ echo foo | ./myConversionScript.py
** hangs here... **

Si golpeoCtrl-C Para terminar el script, el código está atascado en la línea.third_thread.join():

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

Si no uso unthird_process ythird_threadEn su lugar, solo se pasan los datos de la salida del primer hilo a la entrada del segundo hilo, no hay bloqueo.

Algo sobre el tercer hilo parece hacer que las cosas se rompan, pero no sé por qué.

Pensé el punto decommunicate() es que manejará la E / S para los tres procesos, por lo que no estoy seguro de por qué hay un bloqueo de E / S.

¿Cómo consigo tres o más comandos / procesos trabajando juntos, donde un subproceso consume la salida de otro subproceso / proceso?

ACTUALIZAR

Bien, hice algunos cambios que parecen ayudar, en base a algunos comentarios aquí y en otros sitios. Los procesos se hacen parawait() Para completar, y dentro de los métodos de hilo, meclose() Los tubos una vez que el hilo ha procesado todos los datos que puede. Mi preocupación es que el uso de memoria será muy alto para grandes conjuntos de datos, pero al menos las cosas están funcionando:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

Respuestas a la pregunta(2)

Su respuesta a la pregunta