Com Rx, como ignoro o valor all-except-the-latest quando meu método de inscrição está em execução
UsandoExtensões reativas, Quero ignorar as mensagens provenientes do meu fluxo de eventos que ocorrem enquanto o meuSubscribe
método está em execução. Ou seja às vezes, demora mais para processar uma mensagem do que o tempo entre a mensagem, por isso quero descartar as mensagens que não tenho tempo para processar.
No entanto, quando meuSubscribe
método conclui, se alguma mensagem veio através eu quero processar o último. Então eu sempre processo a mensagem mais recente.
Então, se eu tenho algum código que faz:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
e se assumirmos que o '100' demora muito para processar. Então eu quero que o '2' seja processado quando o '100' for concluído. O '1' deve ser ignorado porque foi substituído pelo '2' enquanto o '100' ainda estava sendo processado.
Aqui está um exemplo do resultado que eu quero usar uma tarefa em segundo plano eLatest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
No entanto, Latest () é uma chamada de bloqueio e eu prefiro não ter um thread em espera pelo próximo valor como este (algumas vezes haverá muito tempo entre as mensagens).
Eu também posso obter o resultado que quero usando umBroadcastBlock
deTPL Dataflow, como isso:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
mas parece que isso deveria ser possível diretamente em Rx. Qual é a melhor maneira de fazer isso?