Asyncio обнаруживает разрыв соединения

Я использую Asyncio в Python 3.4, я попытаюсь объяснить, что я делаю до этого момента и что я (думаю) вызывает проблему.

С одной стороны, у меня есть структура соединения UDP с операциями блокировки, я беру данные, полученные из этого потока, и создаю json, который я передаю клиенту в формате SSE. Это все работает отлично.

Проблема, с которой я сталкиваюсь, заключается в том, что я не могу заставить его обрабатывать клиентские разъединения должным образом, если я ничего не делаю, а клиентские разъединения я начну получать эту ошибку:

WARNING [selector_events:613] socket.send() raised exception.

Так как цикл все еще работает, я искал способы аккуратно разорвать цикл и вызвать .close (), но у меня возникают проблемы с примерами, которые я нашел, и в сети не так много ресурсов.

Один пример, который действительно работает, - это попытка прочитать строку с клиента, и если это пустая строка, это означает, что клиент отключен.

    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

однако после примерно десяти сообщений все сообщения клиенту останавливаются, я думаю, это происходит потому, что он висит на «data = (yield from client_reader.readline ())» после того, как он зависает, если я закрываю клиент, тогда он корректно завершает работу и «завершается». Соединение "действительно вызывается. Есть идеи, почему он может висеть? Я думаю, что на данный момент у меня довольно хорошие навыки работы с Asyncio, но этот вопрос меня озадачивает.

Примечание: location () и status () - это два моих вызова для получения информации из UDP-сокета - я успешно запускал их без проблем в течение многих часов с одним и тем же кодом - за исключением линий отключения клиента.

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

Обновление: если я добавлю тайм-аут в "yield from client_reader", я получу следующую ошибку, когда он достигнет точки, в которой он обычно зависает.

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Вот пример сценария, показывающий ошибку в действии - просто запустите ее на python 3.4.2 и после 9 итераций он зависнет при чтении с клиента.

(Сценарий завершен, поэтому вы можете запустить его, чтобы убедиться в этом)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

Другое обновление: я обнаружил, что он умирает, потому что он читает все строки из заголовка клиента, а затем истекает время ожидания, когда у него заканчиваются строки. Я думаю, что реальный ответ, который я ищу, состоит в том, как обнаружить клиентские разъединения, когда вам на самом деле не нужно получать данные от клиента (кроме первоначального соединения).

Ответы на вопрос(3)

Ваш ответ на вопрос