Von verketteten Tasks aus beobachtbar

Ich versuche, eine Observable zu erstellen, in der jedes Element über eine asynchrone Task erstellt wird. Das nächste Element sollte über einen asynchronen Aufruf des Ergebnisses des vorherigen Elements erstellt werden (Co-Rekursion). In "Generate" würde dies ungefähr so aussehen - außer dassGenerate unterstützt kein asynchrones (unterstützt den Delegaten im Ausgangszustand auch nicht.

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

Implementieren Sie ProduceFirst, Continue und ProduceNext wie folgt, um alle Nachrichten aus einer ServiceBus-Warteschlange durch Abrufen von jeweils 100 Nachrichten zu sehen:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

Und dann ruf @ .SelectMany(i => i) auf derIObservable<IEnumerable<BrokeredMessage>>, um es in ein @ zu verwandeIObservable<BrokeredMessage>

Wo _serviceBusReceiver eine Instanz einer Schnittstelle wie folgt ist:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

And BrokeredMessage ist vonhttps: //msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.asp

Antworten auf die Frage(8)

Ihre Antwort auf die Frage