RXJava - mach eine Pause beobachtbar (zum Beispiel mit Puffer und Fenster)

Ich möchte Observablen erstellen, die Folgendes tun:

Alle Elemente puffern, während sie pausiert sindsenden Sie sofort Elemente aus, während sie nicht pausiert sind Der Pause / Wiederaufnahme-Trigger muss von einem anderen beobachtbaren stammen.it muss save sein, um von Observablen verwendet zu werden, die nicht auf dem Hauptthread ausgeführt werden, und es muss save sein, um den pausierten / wiederaufgenommenen Status des Hauptthreads zu ändern.

Ich möchte ein @ verwendBehaviorSubject<Boolean> als Auslöser und binde diesen Auslöser an das @ einer AktivitonResume undonPause Veranstaltung. (Codebeispiel angehängt)

Frag

Ich habe etwas eingerichtet, aber es funktioniert nicht wie beabsichtigt. Ich benutze es wie folgt:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

Zurzeit ist das Problem, dass Variante 1 gut funktionieren sollte, aber manchmal werden die Ereignisse einfach nicht ausgegeben - das Ventil gibt nichts aus, bis das Ventil funktioniert (kann ein Threading-Problem sein ...)! Lösung 2 ist viel einfacher und scheint zu funktionieren, aber ich bin nicht sicher, ob es wirklich besser ist, ich glaube nicht. Ich bin mir eigentlich nicht sicher, warum Lösung 1 manchmal fehlschlägt, also bin ich mir nicht sicher, ob Lösung 2 das (für mich derzeit unbekannte) Problem löst ...

Kann mir jemand sagen, was das Problem sein könnte oder ob die einfache Lösung zuverlässig funktionieren sollte? Oder mir eine zuverlässige Lösung zeigen?

Cod

RxValue

https: //gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f0

RXPauser Funktionen

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Aktivitä

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}

Antworten auf die Frage(4)

Ihre Antwort auf die Frage