Observável de tarefas encadeadas
Estou tentando criar um Observable onde cada item é produzido por meio de uma tarefa assíncrona. O próximo item deve ser produzido por meio de uma chamada assíncrona no resultado do item anterior (co-recursão). Na linguagem "Generate", isso seria algo como isto - exceto queGerar não suporta assíncrono (nem suporta o delegado no estado inicial.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
Como um exemplo mais concreto, para espiar todas as mensagens de uma fila do ServiceBus, buscando-as 100 mensagens por vez, implemente ProduceFirst, Continue e ProduceNext da seguinte maneira:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
E então ligue.SelectMany(i => i)
noIObservable<IEnumerable<BrokeredMessage>>
para transformá-lo em umIObservable<BrokeredMessage>
Onde _serviceBusReceiver é uma instância de uma interface da seguinte maneira:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
E BrokeredMessage é dehttps://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx