¿Cómo limitar un Akka Stream para ejecutar y enviar un mensaje solo una vez por segundo?

Tengo un Akka Stream y quiero que el flujo envíe mensajes de descarga aproximadamente cada segundo.

Intenté dos formas de resolver este problema, la primera fue hacer que el productor al comienzo de la transmisión solo enviara mensajes una vez cada segundo cuando llega un mensaje Continuar a este actor.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Esto funciona por un corto tiempo, luego aparece una avalancha de mensajes Continuar en el actor ActorPublisher, supongo (adivino pero no estoy seguro) desde abajo a través de mensajes de solicitud de contrapresión ya que el flujo descendente puede consumir rápido pero el flujo ascendente no está produciendo a una velocidad rápida . Entonces este método falló.

La otra forma que probé fue a través del control de contrapresión, usé unMaxInFlightRequestStrategy sobre elActorSubscriber al final de la transmisión para limitar el número de mensajes a 1 por segundo. Esto funciona, pero los mensajes que llegan llegan aproximadamente a las tres más o menos a la vez, no solo uno a la vez. Parece que el control de contrapresión no cambia de inmediato la velocidad de los mensajes entrantes O los mensajes ya estaban en cola en la secuencia y esperando ser procesados.

Entonces, el problema es, ¿cómo puedo tener un Akka Stream que pueda procesar un mensaje solo por segundo?

Descubrí queMaxInFlightRequestStrategy es una forma válida de hacerlo, pero debería establecer el tamaño del lote en 1, su tamaño de lote es el predeterminado 5, lo que estaba causando el problema que encontré. También es una forma demasiado complicada de resolver el problema ahora que estoy viendo la respuesta presentada aquí.

Respuestas a la pregunta(1)

Su respuesta a la pregunta