Чтение непрерывного потока потоков из потока с использованием TcpClient и Reactive Extensions

Рассмотрим следующий код:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length);
            return buffer.Take(readBytes).ToObservable();
        });
    }
}

По сути, я хочу проанализировать полученные сообщения с помощью Reactive Extensions. Заголовок сообщения анализируется правильно с помощью Buffer (4), и я получаю длину оставшейся части сообщения. Проблема, которая возникает, заключается в том, что когда я делаю stream.Take (headerLength), код переоценивает всю «цепочку» и пытается получить новое сообщение из потока вместо того, чтобы возвращать оставшиеся байты, которые уже были прочитаны из потока , Точнее, первый ReadAsync (...) возвращает 38 байтов, Buffer (4) возвращает первые 4 из них, observable.Take (headerLength) не возвращает оставшиеся 34 байта, а вместо этого пытается прочитать новый сообщение с ReadAsync.

Вопрос в том, как я могу убедиться, что observable.Take (headerLength) получает уже прочитанные 34 байта и не пытается прочитать новое сообщение из потока? Я искал решение, но не могу понять, как этого добиться.

Изменить: это решение (Использование Reactive Extensions (Rx) для программирования сокетов практично?) это не то, что я ищу. Это не читает все доступное в потоке (вплоть до размера буфера) и делает непрерывный поток данных из него. Мне это решение не кажется очень эффективным способом чтения из потока, отсюда и мой вопрос.

Ответы на вопрос(2)

Ваш ответ на вопрос