Конвейеры, мультиплексирование и неограниченная буферизация

(ПРИМЕЧАНИЕ: ям. .Net 4,не .Net 4.5, поэтому я не могу использовать TPL 'Классы DataflowBlock.)

TL; DR версия

В конечном счете, яЯ просто искал способ обработки последовательных рабочих элементов с использованием нескольких потоков таким образом, чтобы сохранить их порядок в конечном выводе, не требуя неограниченного выходного буфера.

мотивация

У меня есть существующий код, чтобы обеспечить многопоточный механизм для обработки нескольких блоков данных, где один поток, связанный с вводом / выводом ("поставщик») отвечает за постановку в очередь блоков данных для обработки. Эти блоки данных составляют рабочие элементы.

Одна или несколько тем ("процессоры») отвечают за снятие с очереди по одному рабочему элементу за один раз, который они обрабатывают, а затем записывают обработанные данные в очередь вывода, прежде чем снимать с очереди их следующий рабочий элемент.

Последний поток ввода-вывода ("потребитель») отвечает за удаление завершенных рабочих элементов из очереди вывода и запись их в конечный пункт назначения. Эти рабочие элементы (и должны быть) написаны в том же порядке, в котором они были поставлены в очередь. Я реализовал это, используя параллельную очередь приоритетов, где приоритет каждого элемента определяется его исходным индексом.

используя эту схему, чтобы выполнить некоторое пользовательское сжатие в большом потоке данных, где само сжатие является относительно медленным, но чтение несжатых данных и запись сжатых данных относительно быстры (хотя и связаны с вводом / выводом).

Я обрабатываю данные довольно большими порциями порядка 64 КБ, поэтому издержки конвейера относительно невелики.

Мое текущее решение работает хорошо, но в нем много пользовательского кода, написанного 6 лет назад с использованием многих событий синхронизации, и дизайн кажется несколько неуклюжим; поэтому я приступил к академическим упражнениям, чтобы посмотреть, можно ли их переписать, используя более современные библиотеки .Net.

Новый дизайн

Мой новый дизайн используетBlockingCollection класс, и основан на несколькоэта статья Microsoft.

В частности, посмотрите на раздел, озаглавленныйБалансировка нагрузки с использованием нескольких производителей, Я попытался использовать этот подход, и поэтому у меня есть несколько задач обработки, каждая из которых берет рабочие элементы из общего ввода BlockingCollection и записывает свои завершенные элементы в свою собственную очередь вывода BlockingCollection.

Поскольку каждая задача обработки имеет свою собственную очередь вывода, яя пытаюсь использоватьBlockingCollection.TakeFromAny() снять в очередь первый доступный завершенный рабочий элемент.

Проблема мультиплексора

Пока все хорошо, но сейчас возникает проблема. В статье Microsoft говорится:

Пробелы являются проблемой. Следующий этап конвейера, этап «Отображение изображения», должен показывать изображения по порядку и без пропусков в последовательности. Вот где приходит мультиплексор. Используя метод TakeFromAny, мультиплексор ожидает ввода из обеих очередей производителей каскада фильтра. Когда изображение прибывает, мультиплексор смотрит, если изображение 'порядковый номер s является следующим в ожидаемой последовательности. Если это так, мультиплексор передает его на этап отображения изображения. Если изображение не является следующим в последовательности, мультиплексор хранит значение во внутреннем буфере предварительного просмотра и повторяет операцию получения для входной очереди, которая не имеет значения предварительного просмотра. Этот алгоритм позволяет мультиплексору собирать входные данные из входящих очередей производителей таким образом, чтобы обеспечить последовательный порядок без сортировки значений.

Итак, что происходит, так это то, что задачи обработки могут производить готовые изделия практически в любом порядке. Мультиплексор отвечает за вывод этих элементов в правильном порядке.

Тем не мение...

Представьте, что у нас есть 1000 предметов для обработки. Далее представьте себе, что по какой-то странной причине самый первый элемент занимает больше времени для обработки, чем все остальные элементы вместе взятые.

Используя мою текущую схему, мультиплексор будет продолжать чтение и буферизацию элементов из всех выходных очередей обработки, пока не найдет следующую,должен выводить. Так как предмет, который его ждет, является (согласно моему "представь если выше) появится только после того, как ВСЕ другие рабочие элементы будут обработаны, я буду эффективно буферизовать все рабочие элементы во всем вводе!

Объем данных слишком велик, чтобы это произошло. Мне нужно, чтобы задачи обработки не могли выводить завершенные рабочие элементы, когда очередь вывода достигла определенного максимального размера (т.е.s ограниченная очередь вывода) ЕСЛИ МЕНЮ не работает тот рабочий элемент, который ожидает мультиплексор.

И этогде яЯ немного застрял. Я могу придумать много способов реализовать это на практике, но все они кажутся слишком сложными, поскольку они не лучше, чем код, который ядумаю поменять!

Какие'мой вопрос?

Мой вопрос: правильно ли я поступаю?

Я бы подумал, что это будет хорошо понятая проблема, но мое исследование выявило только статьи, которые, кажется, игнорируют проблему неограниченной буферизации, которая возникает, если рабочий элемент занимает очень много времени по сравнению со всеми другими рабочими элементами.

Может кто-нибудь указать мне какие-либо статьи, которые описывают разумный способ достижения этого?

TL; DR версия

В конечном счете, яЯ просто искал способ обработки последовательных рабочих элементов с использованием нескольких потоков таким образом, чтобы сохранить их порядок в конечном выводе, не требуя неограниченного выходного буфера.

Ответы на вопрос(4)

Ваш ответ на вопрос