Sujeto RxJava con contrapresión: solo deje que el último valor se emita una vez que el flujo descendente haya terminado de consumir

Tengo un PublishSubject que llamaonNext() en algún evento de IU. El suscriptor generalmente tarda 2 segundos en completar su trabajo. Necesito ignorar todas las llamadas aonNext() excepto el último mientras el suscriptor está ocupado. Intenté lo siguiente, sin embargo, no puedo controlar el flujo. Parece que las solicitudes se ponen en cola y se procesan todas y cada una de las solicitudes (por lo que la presión de retroceso aparentemente no funciona). ¿Cómo puedo hacer que ignore todas las solicitudes excepto la última? (No quiero usardebounce ya que el código debe reaccionar de inmediato y cualquier tiempo de espera razonablemente pequeño no funcionará).

Además me doy cuenta de usarsubscribeOn con un sujeto no tiene efecto, entonces estoy usandoobserveOn hacer trabajo asíncrono en uno de los operadores. ¿Es este el enfoque correcto?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.LATEST)
  .observeOn(AndroidSchedulers.mainThread())
  .map(discarded -> {
    // PRE-LOADING
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
    return discarded;
   })
   .observeOn(Schedulers.computation())
   .map(b -> {
     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(b -> {
      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

La salida que veo es:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

En cambio, espero que el código haga lo siguiente (es decir, cargar una vez, y mientras se carga, presionar para retener todas las solicitudes y emitir la última, una vez que el primer observador haya terminado, por lo que, en total, idealmente debería cargar solo dos veces a lo sumo):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

Respuestas a la pregunta(1)

Su respuesta a la pregunta