Como implementar o observeLatestOn no RxJava (RxScala)?
Estou tentando implementar oObserveLatestOn operador em RxJava (na verdade, RxScala).
Esse operador é útil quando temos um produtor rápido e um assinante lento, mas o assinante não se importa com nenhum item perdido enquanto estava consumindo um item.
Um diagrama de mármore:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
o=
caractere representa um trabalho de longa duração sendo executado pelo assinante, o>
O caractere representa o trabalho que está acabando. Como exemplo de uso canônico, imagine um produtor de alguns dados que precisam ser exibidos e um renderizador de tela dos dados como assinante. A renderização leva muito tempo, mas não precisamos renderizar todas as etapas na tela, apenas a última é perfeitamente boa.
No diagrama de mármore acima, o produtor sinaliza 1. O assinante começa a processá-lo e leva muito tempo. Enquanto isso, o produtor emite 2 e 3, e não é depois disso que o assinante termina o trabalho. Ele vê que o último item emitido pelo produtor foi 3, então começa a processá-lo. Isso é rápido, nenhum item novo foi produzido enquanto isso, para que o assinante possa descansar. Então, 5 chega e a história continua da mesma maneira.
Passei horas tentando implementar esse operador aparentemente simples, mas ainda não estou satisfeito. A própria natureza do operador indica que ele deve ser assíncrono, deve emitir seus itens em um agendador diferente do que os recebe. Mas, ao mesmo tempo, é claro que não quero ter um segmento ocupado por um trabalhador enquanto não há trabalho a ser feito.
Isto é o que eu vim até agora:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
Parece funcionar, mas não me sinto confiante de que não haja condições de corrida ou impasses. Além disso, não tenho certeza se a solução talvez possa ser mais simples. Eu também estive pensando em outras abordagens, como
algum uso inteligente deOperatorDebounceWithSelector
combinação de um observável solicitando apenas um item por vez,observeOn
eonBackpressureBuffer(1)
Também não sei como escrever testes de unidade determinísticos para isso. O trabalho programado porscheduleRec
não pode ser interrompido quando usado comTestScheduler
, Preciso usar um agendador que realmente funcione em um thread diferente. Acho difícil escrever testes de unidade corretos para condições de corrida de código multiencadeado.
Portanto, a pergunta permanece: minha solução está correta? Existe alguma abordagem mais simples, melhor ou mais correta para isso? E como testar sua correção?