Быстрое повторение TakeWhile вызывает бесконечный цикл

Как я могу сделать следующее наблюдаемое повторение, пока stream.DataAvailable не станет ложным? В настоящее время похоже, что это никогда не останавливается.

AsyncReadChunk и Observable.Return внутри секции Defer вызывают OnNext, а затем OnCompleted. Когда Repeat получает вызов OnNext, он передает его в TakeWhile. Когда TakeWhile не удовлетворен, он завершает наблюдаемое, но я думаю, что OnCompleted, который появляется сразу после OnNext, настолько быстр, что заставляет Repeat повторно подписаться на наблюдаемое и вызывает бесконечный цикл.

Как я могу исправить это поведение?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
    return Observable.Defer(() =>
        {
            try
            {
                return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
            }
            catch (Exception)
            {
                return Observable.Return(new byte[0]);
            }
        })
        .Repeat()
        .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}

Ответы на вопрос(1)

Ваш ответ на вопрос