Assunto RxJava com contrapressão - deixe apenas o último valor emitir quando o downstream terminar de consumir

Eu tenho um PublishSubject que chamaonNext() em algum evento da interface do usuário. O assinante normalmente leva 2 segundos para concluir seu trabalho. Preciso ignorar todas as chamadas paraonNext() exceto o último enquanto o assinante estiver ocupado. Eu tentei o seguinte, no entanto, não consigo controlar o fluxo. As solicitações parecem ficar na fila e cada solicitação é processada (e, portanto, a pressão de retorno aparentemente não está funcionando). Como faço para ignorar todos os pedidos, exceto o último? (Eu não quero usardebounce pois o código precisa reagir imediatamente e qualquer tempo limite razoavelmente pequeno não funcionará).

Além disso, eu percebo usandosubscribeOn com um assunto não tem efeito, então eu estou usandoobserveOn para executar o trabalho assíncrono em um dos operadores. Essa é a abordagem correta?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.LATEST)
  .observeOn(AndroidSchedulers.mainThread())
  .map(discarded -> {
    // PRE-LOADING
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
    return discarded;
   })
   .observeOn(Schedulers.computation())
   .map(b -> {
     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(b -> {
      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

A saída que vejo é:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

Em vez disso, espero que o código faça o seguinte (ou seja, carregue uma vez e durante o carregamento, a contrapressão retenha todas as solicitações e emita a última, uma vez que o primeiro observador tenha terminado - portanto, no total, o ideal seria carregar apenas duas vezes no máximo):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

questionAnswers(1)

yourAnswerToTheQuestion