Наблюдаемый от Цепных Задач
Я пытаюсь создать Observable, где каждый элемент создается с помощью асинхронной задачи. Следующий элемент должен быть произведен посредством асинхронного вызова на результат предыдущего элемента (ко-рекурсия). На языке «Generate» это будет выглядеть примерно так - за исключением того, чтоГенерация не поддерживает асинхронный (и при этом это не поддерживает делегата в начальном состоянии.
var ob = Observable.Generate(
async () => await ProduceFirst(), // Task<T> ProduceFirst()
prev => Continue(prev) // bool Continue(T);
async prev => await ProduceNext(prev) // Task<T> ProduceNext(T)
item => item
);
В качестве более конкретного примера, чтобы просмотреть все сообщения из очереди ServiceBus, извлекая их по 100 сообщений за раз, реализуйте ProduceFirst, Continue и ProduceNext следующим образом:
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
const int batchSize = 100;
return _serviceBusReceiver.PeekBatchAsync(batchSize);
}
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
const int batchSize = 100;
return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}
А потом позвони.SelectMany(i => i)
наIObservable<IEnumerable<BrokeredMessage>>
превратить это вIObservable<BrokeredMessage>
Где _serviceBusReceiver является экземпляром интерфейса следующим образом:
public interface IServiceBusReceiver
{
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}
И BrokeredMessage изhttps://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx