Como executar várias redes neurais Keras em paralelo
Estou tentando usar o Keras para executar um algoritmo de aprendizado por reforço. Neste algoritmo, estou treinando uma rede neural. O que é diferente de outros problemas de aprendizagem é que eu preciso usar a própria rede neural para gerar dados de treinamento e repeti-la após a atualização. Estou com problemas quando estou tentando gerar dados de treinamento em paralelo.
O problema é que não posso dizer ao Theano para usar a GPU durante o treinamento, porque ele também usará a GPU ao gerar dados de treinamento e causará problemas se for invocado por vários processos.
Além disso, eu Theano não funcionará no modo multithread, mesmo quando eu escrevoTHEANO_FLAGS='floatX=float32,device=cpu,openmp=True' OMP_NUM_THREADS=4
antespython
comando também. Isso não causará nenhum erro, mas posso ver que há apenas um thread em execução.
Aqui estão os meus códigos. É uma versão simplificada.
import numpy
from numpy import array
import copy
from time import time
import multiprocessing
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.optimizers import SGD
from keras.models import model_from_json
def runEpisode(qn):
# Some codes that need qn.predict
result = qn.predict(array([[1, 3]])) # That's just for demo
return ([1, 2], 2) # Generated some training data, (X, Y)
def runMultiEpisode(qn, queue, event, nEpisode): # 'queue' is used to return result. 'event' is set when terminates.
# Run several Episodes
result = []
for i in range(nEpisode):
result.append(runEpisode(qn))
# Return the result to main process
queue.put(result)
event.set()
def runEpisode_MultiProcess(nThread, qn, nEpisode):
processes = []
queues = []
events = []
rewardCount = 0.0
# Start processes
for i in range(nThread):
queue = multiprocessing.Queue()
event = multiprocessing.Event()
p = multiprocessing.Process(target = runMultiEpisode,
args = (qn, queue, event, int(nEpisode/nThread)))
p.start()
processes.append(p)
queues.append(queue)
events.append(event)
# Wait for result
for event in events:
event.wait()
# Gather results
result = []
for queue in queues:
result += queue.get()
print 'Got', len(result), 'samples'
return result
def train(qn, nEpisode):
newqn = copy.copy(qn)
# Generate training data
print 'Running episodes'
t = time()
result = runEpisode_MultiProcess(2, qn, nEpisode)
print 'Time:', time() - t
# Fit the neural network
print 'Begin fitting'
t = time()
newqn.fit([x[0] for x in result], [x[1] for x in result])
return newqn
qn = Sequential([Dense(2, input_dim = 2, activation = 'sigmoid'),
Dense(1, activation = 'linear'),])
qn.compile(optimizer =
SGD(lr = 0.001,
momentum = 0.9),
loss = 'mse',
metrics = [])
train(qn, 100)
Então, eu tenho duas perguntas:
Posso dizer à Theano para usar a GPU apenas ao ajustar os dados?O que pode fazer com que o Theano não use o multi-threading?Edit: Acho que o Theano usará o multi-threading em minha própria máquina, mas não em um servidor remoto. Estou pensando se isso é causado por alguma configuração errada.