Как реализовать наблюдаем последние в RxJava (RxScala)?

Я пытаюсь реализоватьObserveLatestOn оператор в RxJava (собственно, RxScala).

Этот оператор полезен, когда у нас есть быстрый производитель и медленный подписчик, но подписчик не заботится о потерянных элементах, когда он потребляет элемент.

Мраморная диаграмма:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

= символ представляет собой длительную работу, выполняемую подписчиком,> персонаж представляет работу только что закончил. В качестве канонического примера использования представьте производителя некоторых данных, которые необходимо отобразить, и средство отображения данных в качестве подписчика. Рендеринг занимает довольно много времени, но нам не нужно рендерить каждый шаг на экране, просто последний из них очень хорош.

На приведенной выше мраморной диаграмме производитель сигнализирует 1. Подписчик начинает ее обрабатывать, и это занимает много времени. Между тем, производитель выпускает 2 и 3, и не после этого подписчик заканчивает работу. Он видит, что последний элемент, выпущенный производителем, был 3, поэтому он начинает обрабатывать это. Это быстро, пока что не было произведено ничего нового, так что подписчик может отдохнуть. Затем наступает 5 и история продолжается таким же образом.

Я потратил часы, пытаясь реализовать этот, казалось бы, простой оператор, но я все еще не удовлетворен. Сама природа оператора указывает на то, что он должен быть асинхронным, он должен генерировать свои элементы в другом планировщике, чем он их получает. Но в то же время, конечно, я не хочу, чтобы какой-то поток занимал рабочий, пока нет никакой работы.

Это то, что я придумал до сих пор:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

Кажется, это работает, но я не уверен, что здесь нет условий гонки или тупиков. Кроме того, я не уверен, что решение может быть упрощено. Я также думал о других подходах, таких как

умное использованиеOperatorDebounceWithSelectorкомбинация наблюдаемой, запрашивающей только один элемент за раз,observeOn а такжеonBackpressureBuffer(1)

Я также не знаю, как написать детерминированные тесты для этого. Работа запланированаscheduleRec не может быть прервано при использовании сTestSchedulerМне нужно использовать планировщик, который действительно работает в другом потоке. Мне трудно писать правильные модульные тесты для состязаний многопоточного кода.

Итак, остается вопрос: правильное ли мое решение? Есть ли более простой, лучший или более правильный подход к этому? И как проверить это на правильность?

Ответы на вопрос(2)

Ваш ответ на вопрос