С Rx, как я игнорирую значение «все, кроме самого последнего», когда мой метод подписки работает

С помощьюРеактивные расширенияЯ хочу игнорировать сообщения, поступающие из моего потока событий, которые происходят, пока мойSubscribe метод работает. То есть иногда обработка сообщения занимает больше времени, чем промежуток времени между сообщениями, поэтому я хочу отбросить сообщения, которые у меня нет времени на обработку.

Тем не менее, когда мойSubscribe Метод завершается, если какие-либо сообщения поступили, я хочу обработать последнее. Поэтому я всегда обрабатываю самое последнее сообщение.

Итак, если у меня есть код, который делает:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

и если мы примем «100»; занимает много времени для обработки. Тогда я хочу, чтобы "2" быть обработанным, когда «100»; завершается. '1' следует игнорировать, потому что он был заменен '2' в то время как '100' все еще обрабатывался.

Вот пример результата, который я хочу использовать с помощью фоновой задачи иLatest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Однако Latest () является блокирующим вызовом, и я предпочел бы, чтобы поток не ожидал следующего значения, подобного этому (иногда между сообщениями будут очень большие промежутки).

Я также могу получить желаемый результат, используяBroadcastBlock отПоток данных TPL, как это:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

но такое чувство, что это должно быть возможно прямо в Rx. Каков наилучший способ сделать это?

 Bryan Anderson13 июн. 2012 г., 21:59
Звучит как работа для Window & lt; T & gt; (), хотя кто-то может предложить более простое решение.
 Dave Hillier14 июн. 2012 г., 00:09
Ваши события должны быть созданы независимо от подписки.

Ответы на вопрос(9)

висимыми, наблюдая за пулом потоков, и я использовал тему, чтобы предоставить обратную связь о завершении задачи.

Я не думаю, что это простое решение, но я надеюсь, что оно может дать вам идеи для улучшения.

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });

feedback.OnNext(Unit.Default);

Есть одна небольшая проблема - буфер сначала закрывается, когда он пуст, поэтому он генерирует значение по умолчанию. Возможно, вы могли бы решить эту проблему, выполнив обратную связь после первого сообщения.

Вот как функция расширения:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();

    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });

    feedback.OnNext(Unit.Default);

    return sub;
}

И использование:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });
 14 июн. 2012 г., 08:48
@yamen Это, наверное, разумно
 14 июн. 2012 г., 03:10
Не хочешьLastOrDefault вместоFirstOrDefault?

которое использует CAS вместо блокировок и избегает рекурсии. Код ниже, но вы можете найти полное объяснение здесь:http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pending,Notification = null;
        var cancelable = new MultipleAssignmentDisposable();

        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);

                if (previousNotification != null) return;

                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}
Решение Вопроса

Введение в Rx слава), теперь у меня есть рабочее решение с использованием этого метода расширения:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}
 07 сент. 2015 г., 06:00
Ах, это имеет смысл! Спасибо.
 Wilka06 сент. 2015 г., 16:11
@ AndrewHanlon, использующий Notification вместо просто значения, предназначен для работы с исключениями, в противном случае они не будут правильно передаваться по каналу OnError.
 06 сент. 2015 г., 15:50
Какова цельMaterializing и используяNotifications а не просто хранить само значение? Из моего тестирования кажется, что он работает так, как и ожидалось, чтобы отслеживать только ценность, но, возможно, мне не хватает некоторых основ.

Это не красиво, потому что это смешиваетTask а такжеObservableтак что это на самом деле не тестируется с использованиемReactiveTest (хотя, если честно, я не уверен, как я реализовал бы «медленного» абонента с помощьюReactiveTest или).

public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        Task task = Task.FromResult(0);
        return source.Subscribe(t =>
        {
            if(task.IsCompleted)
                task = Task.Run(() => observer.OnNext(t));
            else
                Debug.WriteLine("Skip, task not finished");
        }, observer.OnError, observer.OnCompleted);
    });
}

Я предполагаю, что там может быть состояние гонки, но, на мой взгляд, если мы находимся на стадии, когда мы отбрасываем вещи, потому что это происходит слишком быстро, я не возражаю бросить слишком много или слишком много. мало. Ох, и каждыйOnNext вызывается (потенциально) в другом потоке (думаю, я мог бы поставитьSynchronize на задней частиCreate).

Я признаю, что не смог получитьматериализовать расширение для правильной работы (я подключил его кFromEventPattern(MouseMove) и затем подписался с намеренно медленной подпиской, и странно, что он пропустит всплески событий, а не по одному за раз)

Sample вместо этого (что является более подходящим, чем буфер). Я включил метод расширения, аналогичный тому, который я добавил к ответу Дейва.

Расширение:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

Обратите внимание, что это проще, и здесь нет «пустых». буфер, который запущен. Первый элемент, который отправляется в действие, на самом деле происходит из самого потока.

Использование просто:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

И результаты:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
 22 авг. 2015 г., 16:03
не должен возвращатьсяIDisposable также позаботьтесь об утилизации внутреннегоSubject<Unit> ?
 08 апр. 2013 г., 12:26
Это имеет проблему в том, что если источник не поместил ничего в буфер семпла в точке, где вызывается sampler.OnNext, то система переходит в состояние, в котором она не будет генерировать больше значений. Я сделал вариант на этом, используя Switch вместо образцаstackoverflow.com/a/15876519/158285

Task реализация на основе семантики отмены, которая не использует тему. Вызов dispose позволяет подписанному действию отменить обработку, если это необходимо.

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;

        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

Вот простой тест:

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);

                          Console.WriteLine(v);
                      });

Выход:

0
7
14
21
28
35

случай, когда вы выполняете задачу, но ничего нет в очереди.

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();

            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });

            sampler.OnNext(Unit.Default);

            return new CompositeDisposable(connection, subscription);
        }
    }
}
 Wilka17 апр. 2013 г., 14:08
В этом случае я не прояснил это в своем первоначальном вопросе. Я всегда хочу, чтобы новый элемент обрабатывался, поэтому, если он приходит, пока обрабатывается что-то еще, этот элемент должен быть обработан после завершения текущего (а не пропущенного).
 17 апр. 2013 г., 13:32
Он должен игнорировать элементы, которые попадают в очередь, когда он в данный момент обрабатывает. Я не уверен, что вы имеете в виду.
 Wilka17 апр. 2013 г., 12:50
Я только что заметил, что это может пропустить значения. То есть он не всегда обрабатывает самые последние значения в очереди, когда он уже что-то делает. напримерgist.github.com/WilkaH/5403360 только печатает "Готово 100", а не "Готово 2" впоследствии (1 должен быть отброшен, потому что он заменен)
 17 апр. 2013 г., 15:26
Требуется только небольшое изменение в приведенном выше коде, чтобы применить буфер из одного элемента. Упражнение для читателя perhapps? :)
 Wilka17 апр. 2013 г., 15:42
Я, очевидно, являюсь сегодня пустышкой, я пробовал буфер (и некоторые другие вещи), но я не мог понять, где я хочу буферизовать, чтобы заставить его работать.

Chunkify чтобы получить IEnumerable списков, каждый из которых содержит то, что наблюдалось с момента последнего MoveNext.

Вы можете использоватьToObservable преобразовать это обратно в IObservable и обратить внимание только на последнюю запись в каждом непустом списке.

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });
 24 мая 2013 г., 07:23
И это создает Список, полный значений, которые вы потенциально можете игнорировать. Расширение ObserveLatestOn избегает этого - нет списка, нет выделения при расширении списка, нет ссылок, поддерживающих старые уведомления.
 Wilka17 апр. 2013 г., 14:18
Это работает, но оставляет вращение потока, чтобы извлечь что-то из наблюдаемого (так что один из моих процессоров работает максимально)

ние проблемы, которое планирую использовать в производстве.

Если планировщик не использует текущий поток, вызовыOnNext, OnCompleted, OnError из источника должен немедленно вернуться; если наблюдатель занят предыдущими уведомлениями, он попадает в очередь с указанным максимальным размером, откуда он будет уведомляться всякий раз, когда было обработано предыдущее уведомление. Если очередь заполняется, наименее недавние элементы отбрасываются. Таким образом, максимальный размер очереди 0 игнорирует все элементы, поступающие, когда наблюдатель занят; размер 1 всегда позволит наблюдать за последним товаром; размер доint.MaxValue держит потребителя занятым, пока он не догонит производителя.

Если планировщик поддерживает длительный запуск (т. Е. Дает вам собственный поток), я планирую цикл для уведомления наблюдателя; в противном случае я использую рекурсивное планирование.

Вот код. Любые комментарии приветствуются.

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;

        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }

    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));

            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);

            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }

        #region Subscriptions

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }

            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null

                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);

                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }

                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }

            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }

                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }

            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }

        #endregion

        #region IQueue

        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }

        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }

            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }

                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }

            private sealed class One : IQueue
            {
                private TSource _item;

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }

            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;

                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));

                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }

                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }

                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }

        #endregion
    }
}
 16 окт. 2016 г., 21:18
Добро пожаловать. Хотя я и не делал этого из чистого альтруизма, я надеюсь, что кто-то также найдет это полезным (и поможет мне получить 50 репутации, чтобы я, по крайней мере, мог комментировать сообщения)
 15 окт. 2016 г., 14:30
Вау, это много хорошо документированных вещей. Хотя я просто рецензирую и не интересуюсь этой темой, я думаю, что должен поблагодарить вас за то, что вы привели здесь хороший кусок работы.

Ваш ответ на вопрос