Clojure Agenten verbrauchen aus einer Warteschlange

Ich versuche herauszufinden, wie Sie mithilfe von Agents Elemente aus einer Message Queue (Amazon SQS) verwenden können. Im Moment habe ich eine Funktion (process-queue-item), die ein Element aus der Warteschlange holt und verarbeitet.

Ich möchte diese Elemente gleichzeitig verarbeiten, kann mich aber nicht mit der Steuerung der Agenten befassen. Grundsätzlich möchte ich alle Agenten so weit wie möglich beschäftigen, ohne auf viele Elemente aus der Warteschlange zuzugreifen und ein Backlog zu entwickeln (dies wird auf einigen Computern ausgeführt, sodass die Elemente in der Warteschlange verbleiben müssen, bis sie in der Warteschlange sind wirklich gebraucht werden).

Kann mir jemand Hinweise zur Verbesserung meiner Implementierung geben?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

Antworten auf die Frage(4)

Ihre Antwort auf die Frage