Queue wie Subject in RxJava

Ich suche nach einem Betreff (oder ähnlichem), der:

Kann Elemente empfangen und in einer Warteschlange oder einem Puffer speichern, wenn keine Abonnenten vorhanden sindobald wir einen Abonnenten haben, werden alle Artikel verbraucht und nie wieder ausgegebeIch kann den Betreff abonnieren / abbestellen

BehaviorSubject würde fast die Arbeit erledigen, behält aber das zuletzt beobachtete Element bei.

AKTUALISIERE

uf der Grundlage der akzeptierten Antwort habe ich eine ähnliche Lösung für einen einzelnen beobachteten Gegenstand ausgearbeitet. Außerdem wurde ein Teil zum Abbestellen hinzugefügt, um Speicherverluste zu vermeiden.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber)

                subscriber.add(Subscriptions.create { subscription.unsubscribe() })
            }

            return LastEventObservable(onSubscribe, state)
        }
    }

    private class State {
        var lastItem: Any? = null
        val subscriber = PublishSubject.create<Any>()
    }
}

Antworten auf die Frage(6)

Ihre Antwort auf die Frage