Cómo acelerar la comunicación con subprocesos
Estoy usando Python 2subprocess
conthreading
hilos para tomar entrada estándar, procesarlo con binariosA
, B
yC
y escribir datos modificados a la salida estándar.
Este script (llamémoslo:A_to_C.py
) es muy lento y me gustaría aprender a solucionarlo.
El flujo general es el siguiente:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
La idea es que la entrada estándar entra enA_to_C.py
:
A
binario procesa una porción de entrada estándar y creaA
-salida con la funciónproduceA
.losB
binario procesa un trozo deA
salida estándar y creaB
-salida a través de la funciónproduceB
.losC
binario procesa un trozo deB
Salida estándar a través de la funciónproduceC
y escribeC
-salida a salida estándar.Hice perfiles con cProfile y casi todo el tiempo en este script parece gastarse en adquirir bloqueos de hilos.
Por ejemplo, en un trabajo de prueba 417s, 416s (> 99% del tiempo de ejecución total) se gasta en la adquisición de bloqueos de hilo:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects}
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 {posix.access}
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
¿Qué estoy haciendo mal con mithreading.Thread
y / osubprocess.Popen
acuerdo que está causando este problema?