Sujeto RxJava con contrapresión: solo deje que el último valor se emita una vez que el flujo descendente haya terminado de consumir
Tengo un PublishSubject que llamaonNext()
en algún evento de IU. El suscriptor generalmente tarda 2 segundos en completar su trabajo. Necesito ignorar todas las llamadas aonNext()
excepto el último mientras el suscriptor está ocupado. Intenté lo siguiente, sin embargo, no puedo controlar el flujo. Parece que las solicitudes se ponen en cola y se procesan todas y cada una de las solicitudes (por lo que la presión de retroceso aparentemente no funciona). ¿Cómo puedo hacer que ignore todas las solicitudes excepto la última? (No quiero usardebounce
ya que el código debe reaccionar de inmediato y cualquier tiempo de espera razonablemente pequeño no funcionará).
Además me doy cuenta de usarsubscribeOn
con un sujeto no tiene efecto, entonces estoy usandoobserveOn
hacer trabajo asíncrono en uno de los operadores. ¿Es este el enfoque correcto?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
La salida que veo es:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
En cambio, espero que el código haga lo siguiente (es decir, cargar una vez, y mientras se carga, presionar para retener todas las solicitudes y emitir la última, una vez que el primer observador haya terminado, por lo que, en total, idealmente debería cargar solo dos veces a lo sumo):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main