Clojure агенты, потребляющие из очереди - PullRequest
26 голосов
/ 08 апреля 2010

Я пытаюсь выяснить, как лучше использовать агенты для получения элементов из очереди сообщений (Amazon SQS). Прямо сейчас у меня есть функция (process-queue-item), которая отбирает элементы из очереди и обрабатывает их.

Я хочу обрабатывать эти элементы одновременно, но не могу понять, как управлять агентами. По сути, я хочу, чтобы все агенты были заняты как можно больше, не вытягивая много элементов из очереди и не создавая задел действительно нужны).

Кто-нибудь может дать мне несколько советов по улучшению моей реализации?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

Ответы [ 4 ]

23 голосов
/ 12 апреля 2010
(let [switch (atom true) ; a switch to stop workers
      workers (doall 
                (repeatedly 20 ; 20 workers pulling and processing items from SQS
                  #(future (while @switch 
                             (retrieve item from Amazon SQS and process)))))]
  (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)
  (reset! switch false) ; stop !
  (doseq [worker workers] @worker)) ; waiting for all workers to be done
6 голосов
/ 09 апреля 2010

То, что вы просите, - это способ продолжать раздавать задания, но с некоторым верхним пределом. Один простой подход к этому - использовать семафор для координации предела. Вот как бы я подошел к этому:

(let [limit (.availableProcessors (Runtime/getRuntime))
      ; note: you might choose limit 20 based upon your problem description
      sem (java.util.concurrent.Semaphore. limit)]
  (defn submit-future-call
    "Takes a function of no args and yields a future object that will
    invoke the function in another thread, and will cache the result and
    return it on all subsequent calls to deref/@. If the computation has
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks
    until the completion of another future, where n is the number of
    available processors."  
    [#^Callable task]
    ; take a slot (or block until a slot is free)
    (.acquire sem)
    (try
      ; create a future that will free a slot on completion
      (future (try (task) (finally (.release sem))))
      (catch java.util.concurrent.RejectedExecutionException e
        ; no task was actually submitted
        (.release sem)
        (throw e)))))

(defmacro submit-future
  "Takes a body of expressions and yields a future object that will
  invoke the body in another thread, and will cache the result and
  return it on all subsequent calls to deref/@. If the computation has
  not yet finished, calls to deref/@ will block.
  If n futures have already been submitted, then submit-future blocks
  until the completion of another future, where n is the number of
  available processors."  
  [& body] `(submit-future-call (fn [] ~@body)))

#_(example
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@6c69d02b: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    #<core$future_call$reify__5782@38827968: :pending>
    user=> (submit-future (reduce + (range 100000000)))
    ;; blocks at this point for a 2 processor PC until the previous
    ;; two futures complete
    #<core$future_call$reify__5782@214c4ac9: :pending>
    ;; then submits the job

Теперь, когда вам это нужно, вам просто нужно координировать выполнение самих задач. Похоже, у вас уже есть механизмы для этого. Цикл (submit-future (process-queue-item))

4 голосов
/ 09 апреля 2010

Возможно, вы могли бы использовать функцию seque? Цитирование (doc seque):

clojure.core/seque
([s] [n-or-q s])
  Creates a queued seq on another (presumably lazy) seq s. The queued
  seq will produce a concrete seq in the background, and can get up to
  n items ahead of the consumer. n-or-q can be an integer n buffer
  size, or an instance of java.util.concurrent BlockingQueue. Note
  that reading from a seque can block if the reader gets ahead of the
  producer.

Я имею в виду ленивую последовательность получения элементов очереди по сети; Вы обернули бы это в seque, поместили бы это в Ссылку, и рабочие Агенты потребляли предметы из этого seque. seque возвращает что-то, что выглядит как обычный seq с точки зрения вашего кода, а магия очереди происходит прозрачным образом. Обратите внимание, что если последовательность, которую вы помещаете внутрь, является чанковой, то она все равно будет принудительно чанковой за один раз. Также обратите внимание, что первоначальный вызов самого seque, кажется, блокируется до тех пор, пока не будет получен исходный элемент или два (или чанк, в зависимости от случая; я думаю, что это больше связано с тем, как работают ленивые последовательности, чем с самим seque). хотя).

Набросок кода ( действительно схематичный, совсем не тестировался):

(defn get-queue-items-seq []
  (lazy-seq
   (cons (get-queue-item)
         (get-queue-items-seq))))

(def task-source (ref (seque (get-queue-items-seq))))

(defn do-stuff []
  (let [worker (agent nil)]
    (if-let [result
             (dosync
               (when-let [task (first @task-source)]
                (send worker (fn [_] (do-stuff-with task)))))]
      (do (await worker)
          ;; maybe do something with worker's state
          (do-stuff))))) ;; continue working

(defn do-lots-of-stuff []
  (let [fs (doall (repeatedly 20 #(future (do-stuff))))]
    fs)))

На самом деле вы, вероятно, захотите более сложного производителя элемента очереди seq, чтобы вы могли попросить его прекратить производство новых элементов (необходимость, если все это должно быть в состоянии изящно завершиться; фьючерсы умрут когда источник задачи иссякнет, используйте future-done?, чтобы увидеть, сделали ли они это уже). И это только то, что я вижу на первый взгляд ... Я уверен, что здесь есть еще кое-что для полировки. Я думаю, что общий подход будет работать, хотя.

0 голосов
/ 19 июня 2012

Не уверен, насколько это идиоматично, так как я все еще новичок с языком, но мне подходит следующее решение:

(let [number-of-messages-per-time 2
      await-timeout 1000]
  (doseq [p-messages (partition number-of-messages-per-time messages)]
    (let [agents (map agent p-messages)]
      (doseq [a agents] (send-off a process))
      (apply await-for await-timeout agents)
      (map deref agents))))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...