RXJava - mach eine Pause beobachtbar (zum Beispiel mit Puffer und Fenster)
Ich möchte Observablen erstellen, die Folgendes tun:
Alle Elemente puffern, während sie pausiert sindsenden Sie sofort Elemente aus, während sie nicht pausiert sind Der Pause / Wiederaufnahme-Trigger muss von einem anderen beobachtbaren stammen.it muss save sein, um von Observablen verwendet zu werden, die nicht auf dem Hauptthread ausgeführt werden, und es muss save sein, um den pausierten / wiederaufgenommenen Status des Hauptthreads zu ändern.Ich möchte ein @ verwendBehaviorSubject<Boolean>
als Auslöser und binde diesen Auslöser an das @ einer AktivitonResume
undonPause
Veranstaltung. (Codebeispiel angehängt)
Frag
Ich habe etwas eingerichtet, aber es funktioniert nicht wie beabsichtigt. Ich benutze es wie folgt:
Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Zurzeit ist das Problem, dass Variante 1 gut funktionieren sollte, aber manchmal werden die Ereignisse einfach nicht ausgegeben - das Ventil gibt nichts aus, bis das Ventil funktioniert (kann ein Threading-Problem sein ...)! Lösung 2 ist viel einfacher und scheint zu funktionieren, aber ich bin nicht sicher, ob es wirklich besser ist, ich glaube nicht. Ich bin mir eigentlich nicht sicher, warum Lösung 1 manchmal fehlschlägt, also bin ich mir nicht sicher, ob Lösung 2 das (für mich derzeit unbekannte) Problem löst ...
Kann mir jemand sagen, was das Problem sein könnte oder ob die einfache Lösung zuverlässig funktionieren sollte? Oder mir eine zuverlässige Lösung zeigen?
Cod
RxValue
https: //gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f0
RXPauser Funktionen
public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
return observable -> pauser(observable, pauser);
}
private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
// this observable buffers all items that are emitted while emission is paused
Observable<T> sharedSource = source.publish().refCount();
Observable<T> queue = sharedSource
.buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
.flatMap(l -> Observable.from(l))
.doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));
// this observable emits all items that are emitted while emission is not paused
Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
.switchMap(tObservable -> tObservable)
.doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));
// combine both observables
return queue.mergeWith(window)
.doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
Aktivitä
public class BaseActivity extends AppCompatActivity {
private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);
public BaseActivity(Bundle savedInstanceState)
{
super(args);
final Class<?> clazz = this.getClass();
pauser
.doOnUnsubscribe(() -> {
L.d(clazz, "Pauser unsubscribed!");
})
.subscribe(aBoolean -> {
L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
});
}
public PublishSubject<Boolean> getPauser()
{
return pauser;
}
@Override
protected void onResume()
{
super.onResume();
pauser.onNext(true);
}
@Override
protected void onPause()
{
pauser.onNext(false);
super.onPause();
}
}