Lectura de salida del proceso hijo usando python

El contexto

Estoy usando elsubprocess Módulo para iniciar un proceso desde python. Quiero poder acceder a la salida (stdout, stderr) tan pronto como esté escrito / almacenado en búfer.

La solución debe ser compatible con Windows 7. También necesito una solución para sistemas Unix, pero sospecho que el caso de Windows es más difícil de resolver.La solución debe soportar Python 2.6. Actualmente estoy restringido a Python 2.6 pero las soluciones que usan versiones posteriores de Python aún son apreciadas.La solución no debe utilizar bibliotecas de terceros. Idealmente, me encantaría una solución utilizando la biblioteca estándar, pero estoy abierto a sugerencias.La solución debe funcionar para casi cualquier proceso. Supongamos que no hay control sobre el proceso que se está ejecutando.El proceso infantil

Por ejemplo, imagina que quiero ejecutar un archivo de Python llamadocounter.py a través desubprocess. Los contenidos decounter.py es como sigue:

import sys

for index in range(10):

    # Write data to standard out.
    sys.stdout.write(str(index))

    # Push buffered data to disk.
    sys.stdout.flush()
El proceso de los padres

El proceso padre responsable de ejecutar elcounter.py el ejemplo es el siguiente:

import subprocess

command = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    ) 
La cuestión

Utilizando lacounter.py Por ejemplo, puedo acceder a los datos antes de que el proceso se haya completado. ¡Esto es genial! Esto es exactamente lo que quiero. Sin embargo, eliminando elsys.stdout.flush() La llamada impide que se acceda a los datos en el momento que lo deseo. ¡Esto es malo! Esto es exactamente lo que no quiero. Mi entendimiento es que elflush() la llamada obliga a que los datos se escriban en el disco y, antes de que los datos se escriban en el disco, solo existe en un búfer. Recuerda que quiero poder ejecutar casi cualquier proceso. No espero que el proceso realice este tipo de lavado, pero sigo esperando que los datos estén disponibles en tiempo real (o cerca de ellos). ¿Hay una manera de lograr esto?

Una nota rápida sobre el proceso de los padres. Puedes notar que estoy usandobufsize=0 para la línea de búfer. Tenía la esperanza de que esto causaría un lavado al disco para cada línea, pero no parece funcionar de esa manera. ¿Cómo funciona este argumento?

Usted también notará que estoy usandosubprocess.PIPE. Esto se debe a que parece ser el único valor que produce objetos de E / S entre los procesos padre e hijo. He llegado a esta conclusión mirando elPopen._get_handles método en elsubprocess módulo (me refiero a la definición de Windows aquí). Hay dos variables importantes,c2pread yc2pwrite que se establecen en base a lastdout valor pasado a laPopen constructor. Por ejemplo, sistdout no está establecido, elc2pread La variable no está establecida. Este también es el caso cuando se usan descriptores de archivos y objetos similares a archivos. Realmente no sé si esto es significativo o no, pero mi instinto me dice que me gustaría leer y escribir objetos IO por lo que estoy tratando de lograr, por eso elegísubprocess.PIPE. Estaría muy agradecido si alguien pudiera explicar esto con más detalle. Del mismo modo, si hay una razón convincente para usar algo diferente asubprocess.PIPE Soy todo oídos.

Método para recuperar datos del proceso hijo
import time
import subprocess
import threading
import Queue


class StreamReader(threading.Thread):
    """
    Threaded object used for reading process output stream (stdout, stderr).   
    """

    def __init__(self, stream, queue, *args, **kwargs):
        super(StreamReader, self).__init__(*args, **kwargs)
        self._stream = stream
        self._queue = queue

        # Event used to terminate thread. This way we will have a chance to 
        # tie up loose ends. 
        self._stop = threading.Event()

    def stop(self):
        """
        Stop thread. Call this function to terminate the thread. 
        """
        self._stop.set()

    def stopped(self):
        """
        Check whether the thread has been terminated.
        """
        return self._stop.isSet()

    def run(self):
        while True:
            # Flush buffered data (not sure this actually works?)
            self._stream.flush()

            # Read available data.
            for line in iter(self._stream.readline, b''):
                self._queue.put(line)

            # Breather.
            time.sleep(0.25)

            # Check whether thread has been terminated.
            if self.stopped():
                break


cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )

stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()

# Read standard out of the child process whilst it is active.  
while True:

    # Attempt to read available data.  
    try:
        line = stdout_queue.get(timeout=0.1)
        print '%s' % line

    # If data was not read within time out period. Continue. 
    except Queue.Empty:
        # No data currently available.
        pass

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()

Aquí estoy realizando la lógica que lee el proceso secundario en un subproceso. Esto permite el escenario en el que la lectura se bloquea hasta que los datos estén disponibles. En lugar de esperar por un período de tiempo potencialmente largo, verificamos si hay datos disponibles, para leerlos dentro de un período de tiempo de espera, y continuamos haciendo un ciclo si no lo hay.

También he intentado otro enfoque utilizando un tipo de lectura sin bloqueo. Este enfoque utiliza elctypes Módulo para acceder a llamadas al sistema Windows. Tenga en cuenta que no entiendo completamente lo que estoy haciendo aquí; simplemente he tratado de darle sentido a un código de ejemplo que he visto en otras publicaciones. En cualquier caso, el siguiente fragmento de código no resuelve el problema de almacenamiento en búfer. Mi entendimiento es que es solo otra forma de combatir un tiempo de lectura potencialmente largo.

import os
import subprocess

import ctypes
import ctypes.wintypes
import msvcrt

cmd = ['python', 'counter.py']

process = subprocess.Popen(
    cmd,
    bufsize=1,
    stdout=subprocess.PIPE,
    )


def read_output_non_blocking(stream):
    data = ''
    available_bytes = 0

    c_read = ctypes.c_ulong()
    c_available = ctypes.c_ulong()
    c_message = ctypes.c_ulong()

    fileno = stream.fileno()
    handle = msvcrt.get_osfhandle(fileno)

    # Read available data.
    buffer_ = None
    bytes_ = 0
    status = ctypes.windll.kernel32.PeekNamedPipe(
        handle,
        buffer_,
        bytes_,
        ctypes.byref(c_read),
        ctypes.byref(c_available),
        ctypes.byref(c_message),
        )

    if status:
        available_bytes = int(c_available.value)

    if available_bytes > 0:
        data = os.read(fileno, available_bytes)
        print data

    return data

while True:

    # Read standard out for child process.
    stdout = read_output_non_blocking(process.stdout)
    print stdout

    # Check whether child process is still active.
    if process.poll() != None:

        # Process is no longer active.
        break

Los comentarios son muy apreciados.

Aclamaciones

Respuestas a la pregunta(2)

Su respuesta a la pregunta