Ах, ха! Я знал, что должен быть относительно прямой способ сделать это через состав существующих операторов.

я есть наблюдаемая последовательность, которая производит события в быстрых пакетах (то есть: пять событий одно за другим, затем длинная задержка, затем еще один быстрый взрыв событий и т. Д.). Я хочу сгладить эти всплески, вставив небольшую задержку между событиями. Представьте следующую диаграмму в качестве примера:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

Мой текущий подход состоит в том, чтобы генерировать метроном, как таймер с помощьюObservable.Interval() это сигнализирует, когда можно извлечь другое событие из необработанного потока. Проблема в том, что я не могу понять, как затем объединить этот таймер с моей необработанной небуферизованной наблюдаемой последовательностью.

IObservable.Zip() близок к тому, что я хочу, но он работает только до тех пор, пока сырой поток генерирует события быстрее, чем таймер. Как только в необработанном потоке возникает значительное затишье, таймер создает серию нежелательных событий, которые затем сразу связываются со следующим всплеском событий из необработанного потока.

В идеале мне нужен метод расширения IObservable со следующей сигнатурой функции, которая создает поведение, которое я обрисовал выше. Теперь приходите ко мне на помощь StackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS. Я новичок в Rx, поэтому приношу свои извинения, если это тривиально простой вопрос ...

1. Простой, но некорректный подход

Вот мое первоначальное наивное и упрощенное решение, которое имеет довольно много проблем:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

Первая очевидная проблема с этим состоит в том, что IDisposable, возвращенный внутренней подпиской на необработанный источник, потерян, и поэтому подписка не может быть прекращена. Вызов Dispose для IDisposable, возвращаемого этим методом, убивает таймер, но не базовый канал необработанных событий, который теперь без необходимости заполняет очередь, и никто не может извлечь события из очереди.

Вторая проблема заключается в том, что невозможно исключить или распространять уведомления об окончании потока из необработанного потока событий в буферный поток - они просто игнорируются при подписке на необработанный источник.

И наконец, что не менее важно, теперь у меня есть код, который периодически просыпается независимо от того, есть ли на самом деле какая-либо работа, которую я предпочел бы избегать в этом замечательном новом реактивном мире.

2. Слишком сложный подход

Чтобы решить проблемы, возникшие в моем первоначальном упрощенном подходе, я написалмного более сложная функция, которая ведет себя так же, какIObservable.Delay() (Я использовал .NET Reflector для чтения этого кода и использовал его в качестве основы моей функции). К сожалению, много шаблонной логики, такой какAnonymousObservable не является общедоступным за пределами кода system.reactive, поэтому мне пришлось скопировать и вставитьмного кода. Это решение, кажется, работает, но, учитывая его сложность, я менее уверен, что оно не содержит ошибок.

Я просто не могу поверить, что нет способа сделать это, используя некоторую комбинацию стандартных расширений Reactive. Я ненавижу чувствовать, что я без необходимости заново изобретаю колесо, и шаблон, который я пытаюсь создать, кажется довольно стандартным.

Ответы на вопрос(0)

Ваш ответ на вопрос