Mesclando dados históricos e vivos de preços de ações com Rx

Estou experimentando o Rx porque parece um bom ajuste para o nosso domínio, mas a curva de aprendizado me pegou de surpresa.

Preciso juntar dados de preços históricos com dados de preços reais.

Estou tentando adaptar a abordagem usual para fazer isso na linguagem do Rx:

Assine os preços ao vivo imediatamente e comece a armazenar em buffer os valores que recebo de voltaInicie uma solicitação de dados de preços históricos (isso precisa acontecer após a assinatura dos preços ativos, para que não tenhamos lacunas em nossos dados)Publique os preços históricos conforme eles retornamDepois de recebermos todos os dados históricos, publique os dados ativos em buffer, removendo quaisquer valores que se sobreponham aos nossos dados históricos no inícioContinuar repetindo os dados do feed de preço ao vivo

Eu tenho esse código de cara de palha nojento e incorreto que parece funcionar para os casos de teste ingênuos que eu escrevi:

IConnectableObservable<Tick> live = liveService
    .For(symbol)
    .Replay(/* Some appropriate buffer size */);
live.Connect();

IObservable<Tick> historical = historyService.For(since, symbol);

return new[] {historical, live}
    .Concat()
    .Where(TicksAreInChronologicalOrder());

private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw
}

Isso tem algumas desvantagens

O tamanho do buffer de reprodução apropriado não é conhecido. Definir um buffer ilimitado não é possível, esta é uma seqüência de longa duração. Realmente queremos algum tipo de buffer de uma única vez que flushes na primeira chamada para se inscrever. Se isso existir no Rx, não consigo encontrá-lo.O buffer de reprodução continuará existindo mesmo depois de mudarmos para publicar os preços ao vivo. Nós não precisamos do buffer neste momento.Da mesma forma, o predicado para filtrar os ticks sobrepostos não é necessário, uma vez que pulamos a sobreposição inicial entre os preços históricos e os preços ao vivo. Eu realmente quero fazer algo como:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */). ÉWait(this IObservable<TSource>) útil aqui?

Deve haver uma maneira melhor de fazer isso, mas eu ainda estou esperando que meu cérebro troque Rx como se fosse FP.

Outra opção que considerei resolver 1. é escrever minha própria extensão Rx, que seria umaISubject que enfileira as mensagens até obter seu primeiro assinante (e recusa os assinantes depois disso?). Talvez seja esse o caminho a percorrer?

questionAnswers(4)

yourAnswerToTheQuestion