Com Rx, como ignoro o valor all-except-the-latest quando meu método de inscrição está em execução

UsandoExtensões reativas, Quero ignorar as mensagens provenientes do meu fluxo de eventos que ocorrem enquanto o meuSubscribe método está em execução. Ou seja às vezes, demora mais para processar uma mensagem do que o tempo entre a mensagem, por isso quero descartar as mensagens que não tenho tempo para processar.

No entanto, quando meuSubscribe método conclui, se alguma mensagem veio através eu quero processar o último. Então eu sempre processo a mensagem mais recente.

Então, se eu tenho algum código que faz:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

e se assumirmos que o '100' demora muito para processar. Então eu quero que o '2' seja processado quando o '100' for concluído. O '1' deve ser ignorado porque foi substituído pelo '2' enquanto o '100' ainda estava sendo processado.

Aqui está um exemplo do resultado que eu quero usar uma tarefa em segundo plano eLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

No entanto, Latest () é uma chamada de bloqueio e eu prefiro não ter um thread em espera pelo próximo valor como este (algumas vezes haverá muito tempo entre as mensagens).

Eu também posso obter o resultado que quero usando umBroadcastBlock deTPL Dataflow, como isso:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

mas parece que isso deveria ser possível diretamente em Rx. Qual é a melhor maneira de fazer isso?

questionAnswers(9)

yourAnswerToTheQuestion