Asyncio detectando desconexão trava
Estou usando o Asyncio no Python 3.4, tentarei explicar o que estou fazendo até agora e o que (acho) está causando o problema.
Por um lado, tenho uma estrutura de conexão UDP com operações de bloqueio, estou obtendo os dados que recebo desse fluxo e criando json que passo para o cliente no formato SSE. Tudo isso está funcionando muito bem.
O problema que eu estou enfrentando é que não consigo lidar com as desconexões de clientes corretamente se eu não fizer nada e se um cliente desconectar, começarei a receber este erro:
WARNING [selector_events:613] socket.send() raised exception.
Como o loop ainda está em execução, estive procurando maneiras de quebrar o loop de maneira limpa e acionar o .close (), mas estou enfrentando problemas com os exemplos que encontrei e não há muitos recursos online.
O exemplo que parece realmente funcionar é tentar ler uma linha do cliente e, se for uma string vazia, significa que o cliente foi desconectado.
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
no entanto, após cerca de dez mensagens, todas as mensagens para o cliente são interrompidas, acho que isso ocorre porque ele está pendurado em "data = (rendimento de client_reader.readline ())" depois que ele trava se eu fechar o cliente e ele é desligado corretamente e "End Conexão "é chamado. Alguma idéia de por que pode estar pendurado? Acho que tenho um bom domínio do Asyncio neste momento, mas este está me intrigando.
Nota: location () e status () são minhas duas chamadas para obter informações do soquete UDP - eu as executei com êxito sem problemas por muitas horas com o mesmo código - menos as linhas de desconexão do cliente.
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
yield from postmessage(data,client_writer)
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
data = yield from asyncio.wait_for(location(),
timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(status(),
timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
Atualização: se eu adicionar um tempo limite no "yield from client_reader", recebo o seguinte erro quando chega ao ponto em que normalmente travaria.
2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = next(coro)
File "try.py", line 35, in handle_client
timeout=1.0))
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
Aqui está um exemplo de script mostrando o bug em ação - basta executá-lo no python 3.4.2 e após 9 iterações, ele continuará lendo do cliente.
(O script está completo para que você possa executá-lo para ver por si mesmo)
import asyncio
import logging
import json
import time
log = logging.getLogger(__name__)
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_wri,ter))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
if not data: #client disconnected
break
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
@asyncio.coroutine
def test1():
data = {'result':{
'test1':{ }
}
}
data = json.dumps(data)
return data
@asyncio.coroutine
def test2():
data = {'result':{
'test2':{ }
}
}
data = json.dumps(data)
return data
def main():
loop = asyncio.get_event_loop()
f = asyncio.start_server(accept_client, host=None, port=2991)
loop.run_until_complete(f)
loop.run_forever()
if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " +
"[%(module)s:%(lineno)d] %(message)s")
# log the things
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
log.addHandler(ch)
main()
Outra atualização: descobri que ele morre porque lê todas as linhas do cabeçalho do cliente e atinge o tempo limite quando fica sem linhas. Penso que a resposta real que estou procurando é como detectar desconexões de clientes quando você não precisa realmente receber dados do cliente (além da conexão inicial).