Просто используйте

я есть PublishSubject, который вызываетonNext() на каком-то событии пользовательского интерфейса. Абоненту обычно требуется 2 секунды, чтобы завершить свою работу. Мне нужно игнорировать все звонкиonNext() кроме последнего, пока абонент занят. Я попробовал следующее, однако я не могу контролировать поток. Похоже, что запросы помещаются в очередь, и каждый запрос обрабатывается (и поэтому обратное давление, похоже, не работает). Как я могу заставить его игнорировать все запросы, кроме последнего? (Я не хочу использоватьdebounce так как код должен реагировать немедленно, и любой разумно небольшой тайм-аут не будет работать).

Более того, я понимаю, используяsubscribeOn с темой не имеет никакого эффекта, поэтому я используюobserveOn сделать асинхронную работу в одном из операторов. Это правильный подход?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
  .toFlowable(BackpressureStrategy.LATEST)
  .observeOn(AndroidSchedulers.mainThread())
  .map(discarded -> {
    // PRE-LOADING
    Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
    return discarded;
   })
   .observeOn(Schedulers.computation())
   .map(b -> {
     Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
     Thread.sleep(2000);
     return b;
   })
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(b -> {
      Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
   });


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

Вывод, который я вижу:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

Вместо этого я ожидаю, что код выполнит следующее (то есть загрузит один раз, и пока он загружается, обратное давление будет сдерживать все запросы и выдавать последний, как только первый наблюдатель завершит работу - таким образом, в целом он должен загружаться только дважды в большинстве):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

Ответы на вопрос(1)

Ваш ответ на вопрос