С Rx, как я игнорирую значение «все, кроме самого последнего», когда мой метод подписки работает

С помощьюРеактивные расширенияЯ хочу игнорировать сообщения, поступающие из моего потока событий, которые происходят, пока мойSubscribe метод работает. То есть иногда обработка сообщения занимает больше времени, чем промежуток времени между сообщениями, поэтому я хочу отбросить сообщения, которые у меня нет времени на обработку.

Тем не менее, когда мойSubscribe Метод завершается, если какие-либо сообщения поступили, я хочу обработать последнее. Поэтому я всегда обрабатываю самое последнее сообщение.

Итак, если у меня есть код, который делает:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

и если мы примем «100»; занимает много времени для обработки. Тогда я хочу, чтобы "2" быть обработанным, когда «100»; завершается. '1' следует игнорировать, потому что он был заменен '2' в то время как '100' все еще обрабатывался.

Вот пример результата, который я хочу использовать с помощью фоновой задачи иLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Однако Latest () является блокирующим вызовом, и я предпочел бы, чтобы поток не ожидал следующего значения, подобного этому (иногда между сообщениями будут очень большие промежутки).

Я также могу получить желаемый результат, используяBroadcastBlock отПоток данных TPL, как это:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

но такое чувство, что это должно быть возможно прямо в Rx. Каков наилучший способ сделать это?

Ответы на вопрос(9)

Ваш ответ на вопрос