Очередь как Тема в RxJava

Я ищу предмет (или что-то подобное), который может:

Может получать элементы и хранить их в очереди или буфере, если нет подписчиковКак только у нас есть подписчик, все предметы потребляются и никогда не испускаются сноваЯ могу подписаться / отписаться от / от темы

BehaviorSubject почти сделает работу, но он сохранит последний наблюдаемый элемент.

ОБНОВИТЬ

На основании принятого ответа я разработал аналогичное решение для единственного наблюдаемого объекта. Также добавлена ​​отписочная часть, чтобы избежать утечек памяти.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber)

                subscriber.add(Subscriptions.create { subscription.unsubscribe() })
            }

            return LastEventObservable(onSubscribe, state)
        }
    }

    private class State {
        var lastItem: Any? = null
        val subscriber = PublishSubject.create<Any>()
    }
}

Ответы на вопрос(3)

Ваш ответ на вопрос