Wie implementiere ich observLatestOn in RxJava (RxScala)?
Ich versuche das @ zu implementier ObserveLatestOn Operator in RxJava (eigentlich RxScala).
Dieser Operator ist nützlich, wenn wir einen schnellen Produzenten und einen langsamen Abonnenten haben, der Abonnent sich jedoch nicht um verlorene Artikel kümmert, während er einen Artikel konsumierte.
Ein Marmordiagramm:
--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|
Das=
as Zeichen @ steht für eine lang andauernde Arbeit, die vom Abonnenten ausgeführt wir>
as Zeichen @ steht für die gerade abgeschlossene Arbeit. Stellen Sie sich als kanonisches Anwendungsbeispiel einen Produzenten einiger Daten vor, die angezeigt werden müssen, und einen Bildschirmrenderer der Daten als Abonnenten. Das Rendern dauert ziemlich lange, aber wir müssen nicht jeden Schritt auf dem Bildschirm rendern, nur der letzte ist perfekt.
In dem obigen Marmordiagramm signalisiert der Produzent 1. Der Abonnent beginnt, es zu verarbeiten, und es dauert lange. In der Zwischenzeit gibt der Produzent 2 und 3 aus, und erst danach beendet der Abonnent die Arbeit. Es wird angezeigt, dass der letzte vom Hersteller ausgestrahlte Artikel 3 war. Daher beginnt die Verarbeitung dieses Artikels. Das geht schnell, in der Zwischenzeit wurde kein neuer Artikel produziert, damit sich der Abonnent ausruhen kann. Dann kommen 5 und die Geschichte geht auf die gleiche Weise weiter.
Ich habe stundenlang versucht, diesen scheinbar einfachen Operator zu implementieren, bin aber immer noch nicht zufrieden. Die Art des Operators gibt an, dass es sich um einen asynchronen Operator handeln sollte. Die Elemente sollten auf einem anderen Scheduler ausgegeben werden, als sie empfangen wurden. Gleichzeitig möchte ich natürlich nicht, dass ein Thread von einem Arbeiter besetzt wird, solange keine Arbeit zu erledigen ist.
Das habe ich mir bisher ausgedacht:
def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
@volatile var maybeNextItem: Option[Notification[T]] = None
@volatile var isWorkScheduled = false
val itemsQueueLock = new Object()
Observable(subscriber ⇒ {
def signalToSubscriber(materializedItem: Notification[T]): Unit = {
materializedItem match {
case OnNext(item) ⇒ subscriber onNext item
case OnError(error) ⇒ subscriber onError error
case OnCompleted ⇒ subscriber.onCompleted()
}
}
def queueItem(item: Notification[T]): Unit = {
val worker = scheduler.createWorker
val shouldScheduleWork = itemsQueueLock synchronized {
val result = !isWorkScheduled
maybeNextItem = Some(item)
isWorkScheduled = true
result
}
if (shouldScheduleWork) {
worker.scheduleRec {
val maybeNextItemToSignal = itemsQueueLock synchronized {
val result = maybeNextItem
if (result.isEmpty) {
worker.unsubscribe()
isWorkScheduled = false
}
maybeNextItem = None
result
}
maybeNextItemToSignal foreach signalToSubscriber
}
}
}
o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
next ⇒ queueItem(OnNext(next)),
error ⇒ queueItem(OnError(error)),
() ⇒ queueItem(OnCompleted)
)
})
}
Es scheint zu funktionieren, aber ich bin mir nicht sicher, dass es keine Rennbedingungen oder Deadlocks gibt. Ich bin mir auch nicht sicher, ob die Lösung vielleicht einfacher sein könnte. Ich habe auch über andere Ansätze nachgedacht, wie
einige kluge Verwendung vonOperatorDebounceWithSelector
Kombination eines beobachtbaren Objekts, bei der jeweils nur ein Element angefordert wird,observeOn
undonBackpressureBuffer(1)
Ich weiß auch nicht, wie man dazu deterministische Unit-Tests schreibt. Die von @ geplanten ArbeitscheduleRec
kann bei Verwendung mit @ nicht unterbrochen werdTestScheduler
, Ich muss einen Scheduler verwenden, der wirklich auf einem anderen Thread funktioniert. Ich finde es schwierig, korrekte Komponententests für Race-Bedingungen mit Multithread-Code zu schreiben.
Also bleibt die Frage: Ist meine Lösung korrekt? Gibt es einen einfacheren, besseren oder korrekteren Ansatz dafür? Und wie prüfe ich die Richtigkeit?