Asyncio erkennt Unterbrechung hängt

Ich verwende Asyncio in Python 3.4. Ich werde versuchen zu erklären, was ich bis jetzt mache und was ich (glaube) das Problem verursacht.

An einem Ende habe ich ein UDP-Verbindungsframework mit Blockierungsoperationen. Ich nehme die Daten, die ich aus diesem Stream erhalte, und erstelle json, das ich im SSE-Format an den Client weitergebe. Das funktioniert alles super.

Das Problem, auf das ich stoße, ist, dass ich es nicht dazu bringen kann, Client-Verbindungsabbrüche richtig zu handhaben, wenn ich nichts tue und ein Client die Verbindung abbrich

WARNING [selector_events:613] socket.send() raised exception.

seit die Schleife noch läuft, habe ich nach Wegen gesucht, die Schleife sauber zu unterbrechen und die .close () auszulösen, aber ich stoße auf Probleme mit den Beispielen, die ich gefunden habe, und es sind nicht viele Ressourcen online.

Das eine Beispiel, das tatsächlich zu funktionieren scheint, ist der Versuch, eine Zeile vom Client zu lesen. Wenn es sich um eine leere Zeichenfolge handelt, bedeutet dies, dass der Client nicht verbunden ist.

    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break

jedoch nach etwa zehn Nachrichten alle Nachrichten an den Client stoppen, ich denke, das liegt daran, dass es an "data = (yield from client_reader.readline ())" hängt, nachdem es hängt, wenn ich den Client schließe, wird es ordnungsgemäß heruntergefahren und " "Verbindung beenden" wird aufgerufen. Irgendwelche Ideen, warum es hängen könnte? Ich glaube, ich habe Asyncio zu diesem Zeitpunkt ziemlich gut im Griff, aber dieser verwirrt mich.

Hinweis: location () und status () sind meine beiden Aufrufe, um Informationen vom UDP-Socket abzurufen. Ich habe sie mit demselben Code viele Stunden lang ohne Probleme ausgeführt - abzüglich der Client-Verbindungsunterbrechungen.

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_writer))
    clients[task] = (client_writer)


def client_done(task):
    del clients[task]
    client_writer.close()
    log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    yield from postmessage(data,client_writer)
    while True:
        data = (yield from client_reader.readline())
        if not data: #client disconnected
            break
        data = yield from asyncio.wait_for(location(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(status(),
                                           timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

Update: Wenn ich eine Zeitüberschreitung für "yield from client_reader" hinzufüge, wird der folgende Fehler angezeigt, wenn der Punkt erreicht wird, an dem er normalerweise hängen würde.

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = next(coro)
  File "try.py", line 35, in handle_client
    timeout=1.0))
  File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Hier ist ein Beispielskript, das den Fehler in Aktion zeigt - führen Sie es einfach in Python 3.4.2 aus und nach 9 Iterationen bleibt es beim Lesen vom Client hängen.

(Das Skript ist vollständig, sodass Sie es ausführen können, um sich selbst davon zu überzeugen.)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
    task = asyncio.Task(handle_client(client_reader, client_wri,ter))
    clients[task] = (client_writer)

    def client_done(task):
        del clients[task]
        client_writer.close()
        log.info("End Connection")

    log.info("New Connection")
    task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
    data = {'result':{'status':'Connection Ready'}}
    postmessage(data,client_writer)
    count = 0
    while True:
        data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
        if not data: #client disconnected
            break

        data = yield from asyncio.wait_for(test1(),timeout=1.0)
        yield from postmessage(data,client_writer)

        data = yield from asyncio.wait_for(test2(),timeout=1.0)
        yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
        mimetype=('text/event-stream')
        response = ('data: {0}\n\n'.format(data).encode('utf-8'))
        client_writer.write(response)
        client_writer.drain()

@asyncio.coroutine
def test1():
        data = {'result':{
                        'test1':{ }
                    }
                }
        data = json.dumps(data)
        return data

@asyncio.coroutine
def test2():
        data = {'result':{ 
                    'test2':{ }
                    }
                }
        data = json.dumps(data)
        return data 

def main():
    loop = asyncio.get_event_loop()
    f = asyncio.start_server(accept_client, host=None, port=2991)
    loop.run_until_complete(f)
    loop.run_forever()

if __name__ == '__main__':
    log = logging.getLogger("")
    formatter = logging.Formatter("%(asctime)s %(levelname)s " +
                                  "[%(module)s:%(lineno)d] %(message)s")
    # log the things
    log.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)

    ch.setFormatter(formatter)
    log.addHandler(ch)
    main()

Ein weiteres Update: Ich habe festgestellt, dass es stirbt, weil es alle Zeilen aus dem Header des Clients liest und dann eine Zeitüberschreitung feststellt, wenn keine Zeilen mehr vorhanden sind. Ich denke, die eigentliche Antwort, die ich suche, ist, wie man Client-Verbindungsabbrüche erkennt, wenn Sie keine Daten vom Client erhalten müssen (außer der ursprünglichen Verbindung).

Antworten auf die Frage(6)

Ihre Antwort auf die Frage