¿Cómo comparto un observable con publicar y conectar?

Tengo un flujo de datos observable al que estoy aplicando operaciones, dividiéndome en dos flujos separados, aplicando más operaciones (distintas) a cada uno de los dos flujos, y fusionándolos nuevamente. Estoy tratando de compartir lo observable entre dos suscriptores usandoPublish yConnect pero cada uno de los suscriptores parece estar usando una secuencia separada. Es decir, en el ejemplo a continuación, veo "Hacer una operación costosa" impreso una vez para cada elemento en la secuenciapara los dos suscriptores. (Imagine que la operación costosa es algo que debería ocurrir solo una vez entre todos los suscriptores, como tal, estoy tratando de reutilizar la transmisión).Publish yConnect intentar compartir el observable combinado con ambos suscriptores, pero parece tener un efecto incorrecto.

Ejemplo con el problema:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

Veo el siguiente resultado:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(La salida continúa, truncada por brevedad).

¿Cómo puedo compartir lo observable con ambos suscriptores?

Respuestas a la pregunta(2)

Su respuesta a la pregunta