Assunto RxJava com contrapressão - deixe apenas o último valor emitir quando o downstream terminar de consumir
Eu tenho um PublishSubject que chamaonNext()
em algum evento da interface do usuário. O assinante normalmente leva 2 segundos para concluir seu trabalho. Preciso ignorar todas as chamadas paraonNext()
exceto o último enquanto o assinante estiver ocupado. Eu tentei o seguinte, no entanto, não consigo controlar o fluxo. As solicitações parecem ficar na fila e cada solicitação é processada (e, portanto, a pressão de retorno aparentemente não está funcionando). Como faço para ignorar todos os pedidos, exceto o último? (Eu não quero usardebounce
pois o código precisa reagir imediatamente e qualquer tempo limite razoavelmente pequeno não funcionará).
Além disso, eu percebo usandosubscribeOn
com um assunto não tem efeito, então eu estou usandoobserveOn
para executar o trabalho assíncrono em um dos operadores. Essa é a abordagem correta?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
A saída que vejo é:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
Em vez disso, espero que o código faça o seguinte (ou seja, carregue uma vez e durante o carregamento, a contrapressão retenha todas as solicitações e emita a última, uma vez que o primeiro observador tenha terminado - portanto, no total, o ideal seria carregar apenas duas vezes no máximo):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main