Наблюдаемый от Цепных Задач

Я пытаюсь создать Observable, где каждый элемент создается с помощью асинхронной задачи. Следующий элемент должен быть произведен посредством асинхронного вызова на результат предыдущего элемента (ко-рекурсия). На языке «Generate» это будет выглядеть примерно так - за исключением того, чтоГенерация не поддерживает асинхронный (и при этом это не поддерживает делегата в начальном состоянии.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

В качестве более конкретного примера, чтобы просмотреть все сообщения из очереди ServiceBus, извлекая их по 100 сообщений за раз, реализуйте ProduceFirst, Continue и ProduceNext следующим образом:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

А потом позвони.SelectMany(i => i) наIObservable<IEnumerable<BrokeredMessage>> превратить это вIObservable<BrokeredMessage>

Где _serviceBusReceiver является экземпляром интерфейса следующим образом:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

И BrokeredMessage изhttps://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

Ответы на вопрос(4)

Ваш ответ на вопрос