Lectura de salida del proceso hijo usando python
Estoy usando elsubprocess
Módulo para iniciar un proceso desde python. Quiero poder acceder a la salida (stdout, stderr) tan pronto como esté escrito / almacenado en búfer.
Por ejemplo, imagina que quiero ejecutar un archivo de Python llamadocounter.py
a través desubprocess
. Los contenidos decounter.py
es como sigue:
import sys
for index in range(10):
# Write data to standard out.
sys.stdout.write(str(index))
# Push buffered data to disk.
sys.stdout.flush()
El proceso de los padresEl proceso padre responsable de ejecutar elcounter.py
el ejemplo es el siguiente:
import subprocess
command = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
La cuestiónUtilizando lacounter.py
Por ejemplo, puedo acceder a los datos antes de que el proceso se haya completado. ¡Esto es genial! Esto es exactamente lo que quiero. Sin embargo, eliminando elsys.stdout.flush()
La llamada impide que se acceda a los datos en el momento que lo deseo. ¡Esto es malo! Esto es exactamente lo que no quiero. Mi entendimiento es que elflush()
la llamada obliga a que los datos se escriban en el disco y, antes de que los datos se escriban en el disco, solo existe en un búfer. Recuerda que quiero poder ejecutar casi cualquier proceso. No espero que el proceso realice este tipo de lavado, pero sigo esperando que los datos estén disponibles en tiempo real (o cerca de ellos). ¿Hay una manera de lograr esto?
Una nota rápida sobre el proceso de los padres. Puedes notar que estoy usandobufsize=0
para la línea de búfer. Tenía la esperanza de que esto causaría un lavado al disco para cada línea, pero no parece funcionar de esa manera. ¿Cómo funciona este argumento?
Usted también notará que estoy usandosubprocess.PIPE
. Esto se debe a que parece ser el único valor que produce objetos de E / S entre los procesos padre e hijo. He llegado a esta conclusión mirando elPopen._get_handles
método en elsubprocess
módulo (me refiero a la definición de Windows aquí). Hay dos variables importantes,c2pread
yc2pwrite
que se establecen en base a lastdout
valor pasado a laPopen
constructor. Por ejemplo, sistdout
no está establecido, elc2pread
La variable no está establecida. Este también es el caso cuando se usan descriptores de archivos y objetos similares a archivos. Realmente no sé si esto es significativo o no, pero mi instinto me dice que me gustaría leer y escribir objetos IO por lo que estoy tratando de lograr, por eso elegísubprocess.PIPE
. Estaría muy agradecido si alguien pudiera explicar esto con más detalle. Del mismo modo, si hay una razón convincente para usar algo diferente asubprocess.PIPE
Soy todo oídos.
import time
import subprocess
import threading
import Queue
class StreamReader(threading.Thread):
"""
Threaded object used for reading process output stream (stdout, stderr).
"""
def __init__(self, stream, queue, *args, **kwargs):
super(StreamReader, self).__init__(*args, **kwargs)
self._stream = stream
self._queue = queue
# Event used to terminate thread. This way we will have a chance to
# tie up loose ends.
self._stop = threading.Event()
def stop(self):
"""
Stop thread. Call this function to terminate the thread.
"""
self._stop.set()
def stopped(self):
"""
Check whether the thread has been terminated.
"""
return self._stop.isSet()
def run(self):
while True:
# Flush buffered data (not sure this actually works?)
self._stream.flush()
# Read available data.
for line in iter(self._stream.readline, b''):
self._queue.put(line)
# Breather.
time.sleep(0.25)
# Check whether thread has been terminated.
if self.stopped():
break
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()
# Read standard out of the child process whilst it is active.
while True:
# Attempt to read available data.
try:
line = stdout_queue.get(timeout=0.1)
print '%s' % line
# If data was not read within time out period. Continue.
except Queue.Empty:
# No data currently available.
pass
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()
Aquí estoy realizando la lógica que lee el proceso secundario en un subproceso. Esto permite el escenario en el que la lectura se bloquea hasta que los datos estén disponibles. En lugar de esperar por un período de tiempo potencialmente largo, verificamos si hay datos disponibles, para leerlos dentro de un período de tiempo de espera, y continuamos haciendo un ciclo si no lo hay.
También he intentado otro enfoque utilizando un tipo de lectura sin bloqueo. Este enfoque utiliza elctypes
Módulo para acceder a llamadas al sistema Windows. Tenga en cuenta que no entiendo completamente lo que estoy haciendo aquí; simplemente he tratado de darle sentido a un código de ejemplo que he visto en otras publicaciones. En cualquier caso, el siguiente fragmento de código no resuelve el problema de almacenamiento en búfer. Mi entendimiento es que es solo otra forma de combatir un tiempo de lectura potencialmente largo.
import os
import subprocess
import ctypes
import ctypes.wintypes
import msvcrt
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
def read_output_non_blocking(stream):
data = ''
available_bytes = 0
c_read = ctypes.c_ulong()
c_available = ctypes.c_ulong()
c_message = ctypes.c_ulong()
fileno = stream.fileno()
handle = msvcrt.get_osfhandle(fileno)
# Read available data.
buffer_ = None
bytes_ = 0
status = ctypes.windll.kernel32.PeekNamedPipe(
handle,
buffer_,
bytes_,
ctypes.byref(c_read),
ctypes.byref(c_available),
ctypes.byref(c_message),
)
if status:
available_bytes = int(c_available.value)
if available_bytes > 0:
data = os.read(fileno, available_bytes)
print data
return data
while True:
# Read standard out for child process.
stdout = read_output_non_blocking(process.stdout)
print stdout
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
Los comentarios son muy apreciados.
Aclamaciones