Wie kann ich mit Rx alle Werte außer dem neuesten ignorieren, wenn meine Subscribe-Methode ausgeführt wird?

VerwendenReaktive ErweiterungenIch möchte Nachrichten aus meinem Ereignisstrom ignorieren, die auftreten, während meinSubscribe Methode läuft. Das heißt Das Verarbeiten einer Nachricht dauert manchmal länger als die Zeit zwischen den Nachrichten. Daher möchte ich die Nachrichten löschen, für deren Verarbeitung ich keine Zeit habe.

Wenn jedoch meineSubscribe Die Methode ist abgeschlossen. Wenn Nachrichten eingegangen sind, möchte ich die letzte verarbeiten. So verarbeite ich immer die aktuellste Nachricht.

Wenn ich also Code habe, der Folgendes bewirkt:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

und wenn wir annehmen, dass die '100' eine lange Zeit in Anspruch nimmt, um zu verarbeiten. Dann möchte ich, dass die '2' verarbeitet wird, wenn die '100' abgeschlossen ist. Die '1' sollte ignoriert werden, da sie von der '2' abgelöst wurde, während die '100' noch verarbeitet wurde.

Hier ist ein Beispiel für das Ergebnis, das ich mit einer Hintergrundaufgabe und verwenden möchteLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Allerdings ist Latest () ein blockierender Aufruf und ich würde es vorziehen, wenn kein Thread auf den nächsten Wert wie diesen wartet (es wird manchmal sehr lange Lücken zwischen Nachrichten geben).

Ich kann das gewünschte Ergebnis auch mit a erzielenBroadcastBlock vonTPL-Datenfluss, so was:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

aber das fühlt sich so an, als ob es direkt in Rx möglich sein sollte. Was ist der beste Weg, dies zu tun?

Antworten auf die Frage(9)

Ihre Antwort auf die Frage