Como criar um fluxo Rx (RxJS) que pode ser alternado entre o item único e o modo em lote?

Eu tenho um fluxo Rx que é um feed de alterações de saída de um componente específico.

Ocasionalmente, quero poder inserir ummodo de lote no fluxo para que os itens passados paraonNext acumulam e são transmitidos somente quando o modo em lote é encerrado.

Normalmente, eu passo itens únicos pelo fluxo:

stream.onNext(1); 
stream.onNext(2);

Há um mapeamento 1 para 1 entre os itens passados paraonNext e os itens recebidos nose inscrever, então o snippet anterior resulta em duas chamadas parase inscrever com os valores 1 e 2.

omodo de lote Estou procurando pode funcionar algo como isto:

stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();

Neste caso eu querose inscrever para ser chamado apenas uma vez com a matriz concatenada única [1, 2].

Apenas para reiterar, preciso do modo em lote apenas algumas vezes, outras vezes passo os itens não concatenados.

Como posso obter esse comportamento com o Rx?

NOTA: Anteriormente, eu estava passando matrizes atravésonNext embora isso seja principalmente para que o tipo permaneça o mesmo no modo individual e no modo em lote.

Por exemplo:

stream.onNext([1, 2, 3]); 
stream.onNext([4, 5, 6]);

A inscrição recebe [1, 2, 3] e depois [4, 5, 6], mas quando no modo em lote a inscrição recebe os resultados concatenados [1, 2, 3, 4, 5, 6].

Então, quando você olha dessa maneira, é mais comoinconcatenado econcatenado modos (em oposição aIndividual emodo de lote) Mas acho que o problema é muito parecido.