Wie teile ich ein Observable mit Publish und Connect?

Ich habe einen beobachtbaren Datenstrom, auf den ich Operationen anwende, der sich in zwei separate Streams aufteilt, mehr (unterschiedliche) Operationen auf jeden der beiden Streams anwendet und wieder zusammenführt. Ich versuche, die beobachtbare zwischen zwei Abonnenten mit @ zu teilPublish undConnect aber jeder der Abonnenten scheint einen separaten Stream zu verwenden. Das heißt, im folgenden Beispiel wird "Einen teuren Vorgang ausführen" für jedes Element im Stream @ einmal gedruckfür beide Abonnenten. (Stellen Sie sich den teuren Vorgang als etwas vor, das nur einmal zwischen allen Abonnenten passieren sollte. Daher versuche ich, den Stream wiederzuverwenden.) Ich habe @ verwendePublish undConnect, um zu versuchen, das zusammengeführte Observable mit beiden Abonnenten zu teilen, aber es scheint den falschen Effekt zu haben.

Beispiel mit dem Problem:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

Ich sehe die folgende Ausgabe:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(Die Ausgabe wird aus Gründen der Kürze fortgesetzt.)

Wie kann ich das Observable mit beiden Abonnenten teilen?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage