¿Cómo implementar observeLatestOn en RxJava (RxScala)?
Estoy tratando de implementar elObserveLatestOn operador en RxJava (en realidad, RxScala).
Este operador es útil cuando tenemos un productor rápido y un suscriptor lento, pero al suscriptor no le importan los artículos perdidos mientras consumía un artículo.
Un diagrama de mármol:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
los=
El carácter representa un trabajo de larga duración realizado por el suscriptor, el>
El personaje representa el trabajo que acaba de terminar. Como ejemplo canónico de uso, imagine un productor de algunos datos que deben mostrarse y un procesador de pantalla de los datos como suscriptor. El renderizado lleva bastante tiempo, pero no necesitamos renderizar cada paso en la pantalla, solo el último es perfectamente bueno.
En el diagrama de mármol anterior, el productor señala 1. El suscriptor comienza a procesarlo y tarda mucho tiempo. Mientras tanto, el productor emite 2 y 3, y no es después de eso que el suscriptor termina el trabajo. Se ve que el último elemento emitido por el productor fue 3, por lo que comienza a procesar eso. Eso es rápido, mientras tanto, no se ha producido ningún elemento nuevo, por lo que el suscriptor puede descansar. Luego, llega el 5 y la historia continúa de la misma manera.
He pasado horas tratando de implementar este operador aparentemente simple, pero todavía no estoy satisfecho. La naturaleza misma del operador indica que debe ser asíncrono, debe emitir sus elementos en un programador diferente al que los recibe. Pero al mismo tiempo, por supuesto, no quiero tener un hilo ocupado por un trabajador mientras no haya trabajo por hacer.
Esto es lo que se me ocurrió hasta ahora:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
Parece funcionar, pero no estoy seguro de que no haya condiciones de carrera o puntos muertos. Además, no estoy seguro de si la solución podría simplificarse. También he estado pensando en otros enfoques, como
algún uso inteligente deOperatorDebounceWithSelector
combinación de un observable que solicita solo un artículo a la vez,observeOn
yonBackpressureBuffer(1)
Tampoco sé cómo escribir pruebas unitarias deterministas para esto. El trabajo programado porscheduleRec
no se puede interrumpir cuando se usa conTestScheduler
, Necesito usar un programador que realmente funcione en un hilo diferente. Me resulta difícil escribir pruebas unitarias correctas para las condiciones de carrera del código multiproceso.
Entonces, la pregunta sigue siendo: ¿es correcta mi solución? ¿Hay algún enfoque más simple, mejor o más correcto para esto? ¿Y cómo probar su corrección?