Não repetindo quente observável

Pergunta original

Eu tenho um cenário onde eu tenho váriosIObservable seqüências que eu quero combinar comMerge e então escute. No entanto, se um deles produzir um erro, não quero que ele quebre tudo para os outros fluxos, bem como para assinar novamente a sequência (essa é uma sequência "sempre duradoura").

Eu faço isso adicionando umRetry() aos fluxos antes de mesclar, ou seja:

IEnumerable<IObservable<int>> observables = GetObservables();

observables
    .Select(o => o.Retry())
    .Merge()
    .Subscribe(/* Do subscription stuff */);

No entanto, o problema surge quando quero testar isso. O que eu gostaria de testar é que, se um dosIObservablepecadoobservables produz umOnError, os outros ainda devem ser capazes de enviar seus valores e eles devem ser manipulados

Eu pensei em usar apenas doisSubject<int>s representando doisIObservablepecadoobservables; um enviando umOnError(new Exception()) e o outro, depois disso, enviandoOnNext(1). No entanto, pareceSubject<int> irá repetir todos os valores anteriores para uma nova assinatura (que efetivamenteRetry() é), transformando o teste em um loop infinito.

Eu tentei resolver isso criando um manualIObservable que produz um erro na primeira assinatura e depois uma sequência vazia, mas parece hacky:

var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
    i++;
    if (i < nErrors) {
        return Observable.Throw<int>(new Exception()).Subscribe(o);
    } else {
        return Observable.Empty<int>().Subscribe(o);
    }
});

Estou usandoSubject ou pensando emRetry() do jeito errado? Mais alguma ideia sobre isso? Como você resolveria essa situação?

Atualizar

Ok, aqui está um diagrama de mármore do que eu quero epensar Retry() faz.

o = message, X = error.
------o---o---X
               \
     Retry() -> \---o---o---X
                             \
                   Retry() -> \...

Meu problema talvez seja mais porque não tenho uma boa classe de ações para usar os testes preliminares, uma vez queSubject quer repetir todos os meus erros anteriores.

Atualização 2

Aqui está um caso de teste que mostra o que quero dizer sobreSubject repetindo seus valores. Estou usando o termo corretamente se eu disser que faz isso em umfrio caminho? eu seiSubject é uma maneira de criar um hot observável, mas ainda assim este comportamento parece "frio" para mim.

var onNext = false;
var subject = new Subject<int>();

subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);

Assert.That(onNext, Is.True);

questionAnswers(1)

yourAnswerToTheQuestion