Como acelerar a comunicação com subprocessos
Eu estou usando Python 2subprocess
comthreading
threads para obter entrada padrão, processe-a com bináriosA
, B
eC
e escreva dados modificados na saída padrão.
Este script (vamos chamá-lo:A_to_C.py
) é muito lento e eu gostaria de aprender como corrigi-lo.
O fluxo geral é o seguinte:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
A idéia é que a entrada padrão entre emA_to_C.py
:
A
processos binários um pedaço de entrada padrão e criaA
saída com a funçãoproduceA
.oB
processos binários um pedaço deA
da saída padrão e criaB
saída através da funçãoproduceB
.oC
processos binários um pedaço deB
saída padrão da função através da funçãoproduceC
e escreveC
saída para saída padrão.Eu fiz o perfil com cProfile e quase todo o tempo neste script parece ser gasto na aquisição de bloqueios de thread.
Por exemplo, em um trabalho de teste 417s, 416s (> 99% do tempo de execução total) é gasto na aquisição de bloqueios de encadeamento:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects}
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 {posix.access}
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
O que estou fazendo de errado com o meuthreading.Thread
e / ousubprocess.Popen
arranjo que está causando esse problema?