Con Rx, ¿cómo ignoro el valor de todos excepto el último cuando mi método de suscripción se está ejecutando?

UtilizandoExtensiones reactivas, Quiero ignorar los mensajes provenientes de mi flujo de eventos que ocurren mientras miSubscribe El método se está ejecutando. Es decir. a veces me lleva más tiempo procesar un mensaje que el tiempo entre mensajes, por lo que quiero eliminar los mensajes que no tengo tiempo para procesar.

Sin embargo, cuando miSubscribe El método se completa, si alguno de los mensajes llegó, quiero procesar el último. Por eso siempre trato el mensaje más reciente.

Por lo tanto, si tengo algún código que hace:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

y si asumimos que el '100' tarda mucho tiempo en procesarse. Luego quiero que se procese el '2' cuando se complete el '100'. El '1' debe ignorarse porque fue reemplazado por el '2' mientras que el '100' aún estaba siendo procesado.

Aquí hay un ejemplo del resultado que quiero usar una tarea en segundo plano yLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Sin embargo, Latest () es una llamada de bloqueo y prefiero no tener un hilo esperando el siguiente valor como este (a veces habrá espacios muy largos entre los mensajes).

También puedo obtener el resultado que quiero usando unaBroadcastBlock desdeFlujo de datos TPL, Me gusta esto:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

pero esto parece que debería ser posible directamente en Rx. ¿Cuál es la mejor manera de hacerlo?

Respuestas a la pregunta(9)

Su respuesta a la pregunta