Observable desde tareas encadenadas
Estoy tratando de crear un Observable donde cada elemento se produce a través de una tarea asincrónica. El siguiente elemento debe producirse mediante una llamada asíncrona en el resultado del elemento anterior (co-recursión). En el lenguaje "Generar" esto se vería así, excepto queGenerar no admite asíncrono (ni es compatible con el delegado en el estado inicial.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
Como ejemplo más concreto, para ver todos los mensajes de una cola de ServiceBus llevándolos 100 mensajes a la vez, implemente ProduceFirst, Continue y ProduceNext de la siguiente manera:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
Y luego llama.SelectMany(i => i)
sobre elIObservable<IEnumerable<BrokeredMessage>>
para convertirlo en unIObservable<BrokeredMessage>
Donde _serviceBusReceiver es una instancia de una interfaz de la siguiente manera:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
Y BrokeredMessage es dehttps://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx