С Rx, как я игнорирую значение «все, кроме самого последнего», когда мой метод подписки работает
С помощьюРеактивные расширенияЯ хочу игнорировать сообщения, поступающие из моего потока событий, которые происходят, пока мойSubscribe
метод работает. То есть иногда обработка сообщения занимает больше времени, чем промежуток времени между сообщениями, поэтому я хочу отбросить сообщения, которые у меня нет времени на обработку.
Тем не менее, когда мойSubscribe
Метод завершается, если какие-либо сообщения поступили, я хочу обработать последнее. Поэтому я всегда обрабатываю самое последнее сообщение.
Итак, если у меня есть код, который делает:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
и если мы примем «100»; занимает много времени для обработки. Тогда я хочу, чтобы "2" быть обработанным, когда «100»; завершается. '1' следует игнорировать, потому что он был заменен '2' в то время как '100' все еще обрабатывался.
Вот пример результата, который я хочу использовать с помощью фоновой задачи иLatest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
Однако Latest () является блокирующим вызовом, и я предпочел бы, чтобы поток не ожидал следующего значения, подобного этому (иногда между сообщениями будут очень большие промежутки).
Я также могу получить желаемый результат, используяBroadcastBlock
отПоток данных TPL, как это:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
но такое чувство, что это должно быть возможно прямо в Rx. Каков наилучший способ сделать это?