RX - Explosões de grupo / lote de elementos em uma sequência observável

Eu tenho uma sequência observável. Quando o primeiro elemento é inserido,Gostaria de iniciar um cronômetro e agrupar os elementos inseridos subsequentes durante o período de tempo do cronômetro. Em seguida, o cronômetro não iniciaria novamente até que outro elemento fosse inserido na sequência.

Então, algo como isto:

--------|=====timespan====|---------------|=====timespan====|-------------->
        1  2 3 4    5                     6 7            8

produziria:

[1,2,3,4,5], [6,7,8] 

Tentei com Observable.Buffer () e um período de tempo, mas, a partir da minha experiência, posso ver que o cronômetro é iniciado assim que assinamos a sequência observável e é reiniciado assim que o cronômetro anterior é concluído.

Portanto, tendo a mesma sequência do exemplo anterior e usando o Buffer () com um intervalo de tempo, eu teria algo parecido com isto:

|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
        1  2 3 4    5                      6 7           8

o que produziria isso:

[1,2,3,4], [5], [6,7], [8]

Aqui está como eu testei esse comportamento com o buffer:

var source = Observable.Concat(Observable.Timer(TimeSpan.FromSeconds(6)).Select(o => 1),
                               Observable.Timer(TimeSpan.FromSeconds(1)).Select(o => 2),
                               Observable.Timer(TimeSpan.FromSeconds(3)).Select(o => 3),
                               Observable.Never<int>());

Console.WriteLine("{0} => Started", DateTime.Now);
source.Buffer(TimeSpan.FromSeconds(4))
      .Subscribe(i => Console.WriteLine("{0} => [{1}]", DateTime.Now, string.Join(",", i)));

Com a saída:

4/24/2015 7:01:09 PM => Started
4/24/2015 7:01:13 PM => []
4/24/2015 7:01:17 PM => [1,2]
4/24/2015 7:01:21 PM => [3]
4/24/2015 7:01:25 PM => []
4/24/2015 7:01:29 PM => []
4/24/2015 7:01:33 PM => []

Alguém tem uma idéia de como fazer isso? Desde já, obrigado!

questionAnswers(1)

yourAnswerToTheQuestion