Observável de tarefas encadeadas

Estou tentando criar um Observable onde cada item é produzido por meio de uma tarefa assíncrona. O próximo item deve ser produzido por meio de uma chamada assíncrona no resultado do item anterior (co-recursão). Na linguagem "Generate", isso seria algo como isto - exceto queGerar não suporta assíncrono (nem suporta o delegado no estado inicial.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

Como um exemplo mais concreto, para espiar todas as mensagens de uma fila do ServiceBus, buscando-as 100 mensagens por vez, implemente ProduceFirst, Continue e ProduceNext da seguinte maneira:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

E então ligue.SelectMany(i => i) noIObservable<IEnumerable<BrokeredMessage>> para transformá-lo em umIObservable<BrokeredMessage>

Onde _serviceBusReceiver é uma instância de uma interface da seguinte maneira:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

E BrokeredMessage é dehttps://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

questionAnswers(4)

yourAnswerToTheQuestion