Asyncio detecta que la desconexión se cuelga

Estoy usando Asyncio en Python 3.4, intentaré explicar lo que estoy haciendo hasta este punto y lo que (creo) está causando el problema.

En un extremo tengo un marco de conexión UDP con operaciones de bloqueo, tomo los datos que obtengo de esta secuencia y creo json que paso al cliente en formato SSE. Todo esto está funcionando muy bien.

El problema con el que me encuentro es que no puedo hacer que maneje las desconexiones del cliente correctamente si no hago nada y un cliente se desconecta, comenzaré a recibir este error:

WARNING [selector_events:613] socket.send() raised exception.

Como el ciclo aún se está ejecutando, he estado buscando formas de romperlo y activar el .close (), pero me encuentro con problemas con los ejemplos que he encontrado y no hay muchos recursos en línea.

El único ejemplo que parece funcionar realmente es tratar de leer una línea del cliente y si es una cadena vacía, eso significa que el cliente se desconectó.

    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

sin embargo, después de aproximadamente diez mensajes, todos los mensajes para el cliente se detienen, creo que esto se debe a que está colgando en "data = (rendimiento de client_reader.readline ())" después de que se cuelga si cierro el cliente, entonces se cierra correctamente y "Fin Conexión "se llama. ¿Alguna idea de por qué podría estar colgando? Creo que tengo un buen manejo de Asyncio en este momento, pero este me está confundiendo.

Nota: ubicación () y estado () son mis dos llamadas para obtener información del zócalo UDP: las ejecuté sin problemas durante muchas horas con este mismo código, menos las líneas de desconexión del cliente.

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

Actualización: si agrego un tiempo de espera en el "rendimiento de client_reader", aparece el siguiente error cuando llega al punto en que normalmente se colgaría.

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Aquí hay una secuencia de comandos de muestra que muestra el error en acción: simplemente ejecútelo en Python 3.4.2 y después de 9 iteraciones se suspenderá en la lectura del cliente.

(El script está completo para que pueda ejecutarlo y verlo por sí mismo)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_wri,ter))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

Otra actualización: he encontrado que muere porque lee todas las líneas del encabezado del cliente y luego agota el tiempo de espera cuando se queda sin líneas. Creo que la respuesta real que estoy buscando es cómo detectar las desconexiones del cliente cuando en realidad no necesita recibir datos del cliente (aparte de la conexión inicial).

Respuestas a la pregunta(3)

Su respuesta a la pregunta