RXJava - torne um observável pausável (com buffer e janela, por exemplo)

Eu quero criar observáveis que fazem o seguinte:

buffer todos os itens, enquanto eles estão em pausaemitem itens imediatamente, enquanto não estão em pausao gatilho de pausa / retomada deve vir de outro observáveldeve ser salvo para ser usado por observáveis que não são executados no encadeamento principal e deve ser salvo alterar o estado de pausa / retomado do encadeamento principal

Eu quero usar umBehaviorSubject<Boolean> como gatilho e vincule esse gatilho a uma atividadeonResume eonPause evento. (Exemplo de código anexado)

Pergunta, questão

Eu configurei algo, mas não está funcionando conforme o esperado. Eu o uso da seguinte forma:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

Atualmente, o problema é que a variante 1 deve funcionar bem, mas às vezes os eventos simplesmente não são emitidos - a válvula não está emitindo, até que tudo esteja funcionando (pode ser um problema de rosqueamento ...)! A solução 2 é muito mais simples e parece funcionar, mas não tenho certeza se é realmente melhor, acho que não. Na verdade, eu não tenho certeza, por que a solução 1 está falhando às vezes, então não tenho certeza se a solução 2 resolve o problema (atualmente desconhecido para mim) ...

Alguém pode me dizer qual poderia ser o problema ou se a solução simples deve funcionar de maneira confiável? Ou me mostra uma solução confiável?

Código

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

Funções RXPauser

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Atividade

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}

questionAnswers(2)

yourAnswerToTheQuestion