RxJava: "java.lang.IllegalStateException: ¡solo se permite un suscriptor!"

Estoy usando RxJava para calcular la correlación automática normalizada sobre algunos datos del sensor en Android. Por extraño que parezca, mi código arroja una excepción ("java.lang.IllegalStateException: ¡solo se permite un suscriptor!") Y no estoy seguro de qué hacer: sé que GroupedObservables podría lanzar esta excepción cuando se suscriba a mis múltiples suscriptores, pero No creo que esté usando tal cosa en ningún lado.

A continuación encontrará el método que (muy probablemente) desencadena la excepción:

public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) {
    Observable<Float> laggedObservable = observable.skip(lag);

    Observable<Float> meanObservable = mean(observable, lag);
    Observable<Float> laggedMeanObservable = mean(laggedObservable, lag);

    Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag);
    Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag);

    Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float mean) {
            return value - mean;
        }
    });

    Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float value, Float laggedValue) {
            return value * laggedValue;
        }
    });

    Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() {
        @Override
        public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) {
            if(memoObservable == null) return observable;

            return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() {
                @Override
                public Float call(Float memo, Float value) {
                    return memo + value;
                }
            });
        }
    }));

    Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float standardDeviation, Float laggedStandardDeviation) {
            return lag * standardDeviation * laggedStandardDeviation;
        }
    });

    return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() {
        @Override
        public Float call(Float autoCorrelation, Float normalization) {
            return autoCorrelation / normalization;
        }
    });
}

Y este es el stacktrace que obtengo:

java.lang.IllegalStateException: Only one subscriber allowed!
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124)
at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
  at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
  at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98)
  at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45)
  at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59)
  at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121)
  at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161)
  at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183)
  at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58)
  at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
  at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
  at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
  at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102)
  at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418)
  at android.os.MessageQueue.nativePollOnce(Native Method)
  at android.os.MessageQueue.next(MessageQueue.java:138)
  at android.os.Looper.loop(Looper.java:123)
  at android.app.ActivityThread.main(ActivityThread.java:5146)
  at java.lang.reflect.Method.invokeNative(Native Method)
  at java.lang.reflect.Method.invoke(Method.java:515)
  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732)
  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566)
  at dalvik.system.NativeStart.main(Native Method)

No creo que esté haciendo nada extraño aquí: algunas cremalleras, reducciones, escaneos y mapas planos.

¿Me estoy perdiendo algo completamente obvio, hay alguna regla oculta que estoy rompiendo aquí o es un error en RxJava? ¡Gracias!

PD. Si le falta algún código para que pueda sacar sus conclusiones, ¡solo pregunte y lo publicaré!

Respuestas a la pregunta(1)

Su respuesta a la pregunta