RX - групповые / пакетные пакеты элементов в наблюдаемой последовательности

У меня наблюдаемая последовательность. Когда первый элемент вставлен,Я хотел бы запустить таймер и пакетные последующие вставленные элементы во время таймера. Тогда таймер не запустится снова, пока в последовательность не будет вставлен другой элемент.

Так что-то вроде этого:

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 7            8

будет производить:

[1,2,3,4,5], [6,7,8] 

Я попытался с Observable.Buffer () и временным интервалом, но из моих экспериментов я вижу, что таймер запускается, как только мы подписываемся на наблюдаемую последовательность, и перезапускается, как только предыдущий таймер завершается.

Таким образом, имея ту же последовательность, что и в предыдущем примере, и используя Buffer () с интервалом времени, я бы получил что-то вроде этого:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4    5                      6 7           8

который произвел бы это:

[1,2,3,4], [5], [6,7], [8]

Вот как я тестировал это поведение с помощью Buffer:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                               Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                               Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
                               Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
      .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));

С выходом:

4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 7:01:25 PM => []
4/24/2015 7:01:29 PM => []
4/24/2015 7:01:33 PM => []

У кого-нибудь есть идеи, как это сделать? Заранее спасибо!

Ответы на вопрос(1)

Ваш ответ на вопрос