W Rx, jak zignorować wartość all-except-the-latest, gdy uruchomiona jest moja metoda Subskrybowania

Za pomocąRozszerzenia reaktywne, Chcę ignorować wiadomości pochodzące z mojego strumienia zdarzeń, które występują podczas mojegoSubscribe metoda jest uruchomiona. To znaczy. Przetwarzanie wiadomości zajmuje mi więcej czasu niż czas między wiadomościami, więc chcę upuścić wiadomości, których nie mam czasu przetworzyć.

Jednak kiedy mojaSubscribe metoda się kończy, jeśli jakieś wiadomości dotarły, chcę przetworzyć ostatnią. Dlatego zawsze przetwarzam najnowszą wiadomość.

Więc jeśli mam jakiś kod, który robi:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

a jeśli przyjmiemy, że przetwarzanie „100” zajmuje dużo czasu. Następnie chcę, aby „2” było przetwarzane po zakończeniu „100”. „1” należy zignorować, ponieważ zostało ono zastąpione przez „2”, podczas gdy „100” było nadal przetwarzane.

Oto przykład wyniku, którego chcę użyć przy użyciu zadania w tle iLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Jednak Latest () jest połączeniem blokującym i wolałbym, żeby wątek nie czekał na następną wartość taką jak ta (czasami będą bardzo długie przerwy między wiadomościami).

Mogę także uzyskać wynik, którego chcę, używającBroadcastBlock zPrzepływ danych TPL, lubię to:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

ale wydaje się, że powinno być możliwe bezpośrednio w Rx. Jaki jest najlepszy sposób, aby to zrobić?

questionAnswers(9)

yourAnswerToTheQuestion