Wie schränke ich ein, dass ein Akka Stream nur einmal pro Sekunde ausgeführt und gesendet wird?

Ich habe einen Akka-Stream und möchte, dass der Stream ungefähr jede Sekunde Nachrichten nach unten sendet.

Ich habe zwei Möglichkeiten ausprobiert, um dieses Problem zu lösen. Die erste Möglichkeit bestand darin, den Produzenten zu Beginn des Streams zu veranlassen, nur einmal pro Sekunde Nachrichten zu senden, wenn eine Continue-Nachricht in diesen Akteur eingeht.

// When receive a Continue message in a ActorPublisher // do work then... if (totalDemand > 0) { import scala.concurrent.duration._ context.system.scheduler.scheduleOnce(1 second, self, Continue) }

Dies funktioniert für eine kurze Zeit, dann erscheint im ActorPublisher-Darsteller eine Flut von Fortsetzungsnachrichten. Ich gehe davon aus, dass (erraten, aber nicht sicher) Nachrichten vom Downstream über den Gegendruck angefordert werden, da der Downstream schnell verbrauchen kann, der Upstream jedoch nicht schnell produziert Bewertung. Also ist diese Methode fehlgeschlagen.

Die andere Möglichkeit, die ich ausprobierte, war die Regelung des Gegendrucks. Ich verwendete einMaxInFlightRequestStrategy auf derActorSubscriber am Ende des Streams, um die Anzahl der Nachrichten auf 1 pro Sekunde zu begrenzen. Dies funktioniert, aber die eingehenden Nachrichten kommen ungefähr um drei Uhr und nicht nur um eine Uhr. Es sieht so aus, als würde die Rückstaukontrolle nicht sofort die Rate der eingehenden Nachrichten ändern, oder Nachrichten, die sich bereits in der Warteschlange des Streams befinden und auf ihre Verarbeitung warten.

Also ist das Problem, wie kann ich einen Akka Stream haben, der nur eine Nachricht pro Sekunde verarbeiten kann?

Ich entdeckte, dassMaxInFlightRequestStrategy ist eine gültige Methode, aber ich sollte die Stapelgröße auf 1 setzen, die Standard-Stapelgröße ist 5, was das von mir gefundene Problem verursacht hat. Es ist auch eine überkomplizierte Möglichkeit, das Problem zu lösen, nachdem ich mir die hier eingereichte Antwort angesehen habe.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage