Zusammenführen historischer und aktueller Kursdaten mit Rx

Ich probiere Rx aus, weil es für unsere Domäne passend zu sein scheint, aber die Lernkurve hat mich überrascht.

Ich muss historische Kursdaten mit Live-Kursdaten verknüpfen.

Ich versuche, den üblichen Ansatz, dies zu tun, in die Sprache von Rx umzuwandeln:

Abonnieren Sie sofort die Live-Preise und fangen Sie an, die zurückgegebenen Werte zwischenzuspeichernInitiieren Sie eine Anfrage für historische Preisdaten (dies muss nach dem Abonnieren von Live-Preisen geschehen, damit wir keine Lücken in unseren Daten haben).Veröffentlichen Sie historische Preise, sobald sie wieder verfügbar sindSobald wir alle historischen Daten erhalten haben, veröffentlichen Sie die gepufferten Live-Daten und entfernen Sie alle Werte, die sich zu Beginn mit unseren historischen Daten überschneidenSetzen Sie die Wiedergabe der Daten aus dem Live-Preis-Feed fort

Ich habe diesen ekelhaften und falschen Strohmann-Code, der für die naiven Testfälle zu funktionieren scheint, die ich geschrieben habe:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

Dies hat einige Nachteile

Die entsprechende Größe des Wiedergabepuffers ist nicht bekannt. Das Setzen eines unbegrenzten Puffers ist nicht möglich - dies ist eine lange Sequenz. Eigentlich wollen wir eine Art einmaligen Puffer, der beim ersten Aufruf von Subscribe leer wird. Wenn dies in Rx vorhanden ist, kann ich es nicht finden.Der Wiederholungspuffer bleibt auch dann bestehen, wenn wir auf die Veröffentlichung von Live-Preisen umgestellt haben. Wir brauchen den Puffer an dieser Stelle nicht.Ebenso ist das Prädikat zum Herausfiltern überlappender Ticks nicht erforderlich, nachdem die anfängliche Überlappung zwischen historischen und Live-Preisen übersprungen wurde. Ich möchte wirklich etwas machen wie:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). IstWait(this IObservable<TSource>) nützlich hier?

Es muss einen besseren Weg geben, dies zu tun, aber ich warte immer noch darauf, dass mein Gehirn Rx wie FP grockt.

Eine andere Option, die ich in Betracht gezogen habe, um 1. zu lösen, ist das Schreiben meiner eigenen Rx-Erweiterung, die eine wäreISubject Das stellt Nachrichten in die Warteschlange, bis es seinen ersten Abonnenten erhält (und lehnt Abonnenten danach ab?). Vielleicht ist das der richtige Weg?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage