Como implementar o observeLatestOn no RxJava (RxScala)?

Estou tentando implementar oObserveLatestOn operador em RxJava (na verdade, RxScala).

Esse operador é útil quando temos um produtor rápido e um assinante lento, mas o assinante não se importa com nenhum item perdido enquanto estava consumindo um item.

Um diagrama de mármore:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

o= caractere representa um trabalho de longa duração sendo executado pelo assinante, o> O caractere representa o trabalho que está acabando. Como exemplo de uso canônico, imagine um produtor de alguns dados que precisam ser exibidos e um renderizador de tela dos dados como assinante. A renderização leva muito tempo, mas não precisamos renderizar todas as etapas na tela, apenas a última é perfeitamente boa.

No diagrama de mármore acima, o produtor sinaliza 1. O assinante começa a processá-lo e leva muito tempo. Enquanto isso, o produtor emite 2 e 3, e não é depois disso que o assinante termina o trabalho. Ele vê que o último item emitido pelo produtor foi 3, então começa a processá-lo. Isso é rápido, nenhum item novo foi produzido enquanto isso, para que o assinante possa descansar. Então, 5 chega e a história continua da mesma maneira.

Passei horas tentando implementar esse operador aparentemente simples, mas ainda não estou satisfeito. A própria natureza do operador indica que ele deve ser assíncrono, deve emitir seus itens em um agendador diferente do que os recebe. Mas, ao mesmo tempo, é claro que não quero ter um segmento ocupado por um trabalhador enquanto não há trabalho a ser feito.

Isto é o que eu vim até agora:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

Parece funcionar, mas não me sinto confiante de que não haja condições de corrida ou impasses. Além disso, não tenho certeza se a solução talvez possa ser mais simples. Eu também estive pensando em outras abordagens, como

algum uso inteligente deOperatorDebounceWithSelectorcombinação de um observável solicitando apenas um item por vez,observeOn eonBackpressureBuffer(1)

Também não sei como escrever testes de unidade determinísticos para isso. O trabalho programado porscheduleRec não pode ser interrompido quando usado comTestScheduler, Preciso usar um agendador que realmente funcione em um thread diferente. Acho difícil escrever testes de unidade corretos para condições de corrida de código multiencadeado.

Portanto, a pergunta permanece: minha solução está correta? Existe alguma abordagem mais simples, melhor ou mais correta para isso? E como testar sua correção?