MailboxProcessor, который работает с логикой LIFO

Я узнаю об агентах F # (MailboxProcessor).

Я имею дело с довольно нетрадиционной проблемой.

У меня есть один агент (dataSource) который является источником потоковых данных. Данные должны быть обработаны массивом агентов (dataProcessor). Мы можем рассмотретьdataProcessor как какое-то устройство слежения.Данные могут передаваться быстрее, чем скорость, с которойdataProcessor может быть в состоянии обработать его ввод.Это нормально, чтобы иметь некоторую задержку. Тем не менее, я должен убедиться, что агент остается на вершине своей работы и не накапливается под устаревшими наблюдениями

Я изучаю способы решения этой проблемы.

первая идея это реализоватьстек (LIFO) в.dataSourcedataSource послал бы по последнему доступному наблюдению, когдаdataProcessor становится доступным для получения и обработки данных. Это решение может работать, но может усложниться, так какdataProcessor возможно, потребуется заблокировать и повторно активировать; и сообщить свой статусdataSource, что приводит к проблеме двусторонней связи. Эта проблема может сводиться кblocking queue впроблема потребителя-производителя но я не уверен..

вторая идея это иметьdataProcessor заботиться о сортировке сообщений. В этой архитектуреdataSource будет просто публиковать обновления в 'dataProcessorОчередьdataProcessor буду использоватьScanчтобы получить последние данные, доступные в его очереди. Это может быть путь. Тем не менее, я не уверен, если в текущем дизайнеMailboxProcessorможно очистить очередь сообщений, удалив старые устаревшие. Более того,Вотнаписано что:

К сожалению, функция TryScan в текущей версии F # сломана двумя способами. Во-первых, весь смысл в том, чтобы указать время ожидания, но реализация фактически не соблюдает его. В частности, неактуальные сообщения сбрасывают таймер. Во-вторых, как и в случае с другой функцией сканирования, очередь сообщений проверяется под блокировкой, которая не позволяет другим потокам отправлять сообщения на время сканирования, которое может быть сколь угодно долгим. Следовательно, сама функция TryScan имеет тенденцию блокировать параллельные системы и может даже вводить взаимоблокировки, потому что вызывающая сторонаКод оценивается внутри блокировки (например, отправка аргумента функции в Scan или TryScan может заблокировать агент, когда код под блокировкой блокировки ожидает получения блокировки, под которой он уже находится).

Проблемы с последним наблюдением могут быть проблемой. Автор этого поста @Jon Harrop предполагает, что

Мне удалось создать архитектуру, и полученная архитектура была на самом деле лучше. По сути я с нетерпениемReceive все сообщения и фильтр, используя мою собственную локальную очередь.

Эта идея, безусловно, заслуживает изучения, но, прежде чем начать играть с кодом, я хотел бы получить некоторые сведения о том, как я могу структурировать свое решение.

Спасибо.