¿Cómo implementar observeLatestOn en RxJava (RxScala)?

Estoy tratando de implementar elObserveLatestOn operador en RxJava (en realidad, RxScala).

Este operador es útil cuando tenemos un productor rápido y un suscriptor lento, pero al suscriptor no le importan los artículos perdidos mientras consumía un artículo.

Un diagrama de mármol:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

los= El carácter representa un trabajo de larga duración realizado por el suscriptor, el> El personaje representa el trabajo que acaba de terminar. Como ejemplo canónico de uso, imagine un productor de algunos datos que deben mostrarse y un procesador de pantalla de los datos como suscriptor. El renderizado lleva bastante tiempo, pero no necesitamos renderizar cada paso en la pantalla, solo el último es perfectamente bueno.

En el diagrama de mármol anterior, el productor señala 1. El suscriptor comienza a procesarlo y tarda mucho tiempo. Mientras tanto, el productor emite 2 y 3, y no es después de eso que el suscriptor termina el trabajo. Se ve que el último elemento emitido por el productor fue 3, por lo que comienza a procesar eso. Eso es rápido, mientras tanto, no se ha producido ningún elemento nuevo, por lo que el suscriptor puede descansar. Luego, llega el 5 y la historia continúa de la misma manera.

He pasado horas tratando de implementar este operador aparentemente simple, pero todavía no estoy satisfecho. La naturaleza misma del operador indica que debe ser asíncrono, debe emitir sus elementos en un programador diferente al que los recibe. Pero al mismo tiempo, por supuesto, no quiero tener un hilo ocupado por un trabajador mientras no haya trabajo por hacer.

Esto es lo que se me ocurrió hasta ahora:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

Parece funcionar, pero no estoy seguro de que no haya condiciones de carrera o puntos muertos. Además, no estoy seguro de si la solución podría simplificarse. También he estado pensando en otros enfoques, como

algún uso inteligente deOperatorDebounceWithSelectorcombinación de un observable que solicita solo un artículo a la vez,observeOn yonBackpressureBuffer(1)

Tampoco sé cómo escribir pruebas unitarias deterministas para esto. El trabajo programado porscheduleRec no se puede interrumpir cuando se usa conTestScheduler, Necesito usar un programador que realmente funcione en un hilo diferente. Me resulta difícil escribir pruebas unitarias correctas para las condiciones de carrera del código multiproceso.

Entonces, la pregunta sigue siendo: ¿es correcta mi solución? ¿Hay algún enfoque más simple, mejor o más correcto para esto? ¿Y cómo probar su corrección?

Respuestas a la pregunta(2)

Su respuesta a la pregunta