Как ограничить выполнение Akka Stream и отправку одного сообщения только один раз в секунду?

У меня есть Akka Stream, и я хочу, чтобы поток отправлял сообщения вниз по течению примерно каждую секунду.

Я попробовал два способа решения этой проблемы, первый из них заключался в том, чтобы заставить производителя в начале потока отправлять сообщения только раз в секунду, когда в этот субъект поступает сообщение Continue.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Это работает на короткое время, после чего в акторе ActorPublisher появляется поток сообщений Continue, я полагаю (предположим, но не уверен) из нисходящего потока с помощью сообщений запроса обратного давления, поскольку нисходящий поток может потреблять быстро, но восходящий поток не производит с высокой скоростью , Так что этот метод не удался.

Другой способ, которым я пытался, был через контроль противодавления, я использовалMaxInFlightRequestStrategy наActorSubscriber в конце потока, чтобы ограничить количество сообщений до 1 в секунду. Это работает, но поступающие сообщения приходят примерно три или около того за один раз, а не только по одному за раз. Кажется, управление противодавлением не сразу меняет частоту сообщений, приходящих в ИЛИ, сообщения уже были поставлены в очередь в потоке и ожидают обработки.

Итак, проблема в том, как я могу иметь Akka Stream, который может обрабатывать одно сообщение только в секунду?

Я обнаружил, чтоMaxInFlightRequestStrategy это правильный способ сделать это, но я должен установить размер пакета 1, его размер пакета по умолчанию 5, что стало причиной проблемы, которую я обнаружил. Кроме того, это слишком сложный способ решения проблемы сейчас, когда я смотрю на представленный ответ здесь.

Ответы на вопрос(1)

Ваш ответ на вопрос