Просто используйте
я есть PublishSubject, который вызываетonNext()
на каком-то событии пользовательского интерфейса. Абоненту обычно требуется 2 секунды, чтобы завершить свою работу. Мне нужно игнорировать все звонкиonNext()
кроме последнего, пока абонент занят. Я попробовал следующее, однако я не могу контролировать поток. Похоже, что запросы помещаются в очередь, и каждый запрос обрабатывается (и поэтому обратное давление, похоже, не работает). Как я могу заставить его игнорировать все запросы, кроме последнего? (Я не хочу использоватьdebounce
так как код должен реагировать немедленно, и любой разумно небольшой тайм-аут не будет работать).
Более того, я понимаю, используяsubscribeOn
с темой не имеет никакого эффекта, поэтому я используюobserveOn
сделать асинхронную работу в одном из операторов. Это правильный подход?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
Вывод, который я вижу:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
Вместо этого я ожидаю, что код выполнит следующее (то есть загрузит один раз, и пока он загружается, обратное давление будет сдерживать все запросы и выдавать последний, как только первый наблюдатель завершит работу - таким образом, в целом он должен загружаться только дважды в большинстве):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main