Производитель потребитель с квалификацией - PullRequest
21 голосов
/ 03 мая 2010

Я новичок в clojure и пытаюсь понять, как правильно использовать его функции параллелизма, поэтому любая критика / предложения приветствуются. Поэтому я пытаюсь написать небольшую тестовую программу в clojure, которая работает следующим образом:

  1. там 5 производителей и 2 потребителя
  2. производитель ожидает случайное время и затем помещает число в общую очередь.
  3. потребитель должен извлечь номер из очереди, как только очередь не пуста, а затем ненадолго поспать, чтобы имитировать выполнение работы
  4. потребители должны блокировать, когда очередь пуста
  5. производители должны блокировать, когда в очереди более 4 элементов, чтобы предотвратить ее увеличение

Вот мой план на каждый шаг выше:

  1. производители и потребители будут агентами, которые на самом деле не заботятся о своем состоянии (просто нулевые значения или что-то в этом роде); я просто использую агентов для отправки функции «потребителя» или «производителя» в какое-то время. Тогда общая очередь будет (def queue (ref [])). Возможно, это должен быть атом?
  2. в функции агента «продюсер», просто (Thread / sleep (rand-int 1000)) и затем (dosync (alter queue coe (rand-int 100)))) для передачи в очередь.
  3. Я думаю, чтобы агенты-потребители следили за изменениями с помощью add-watcher. Хотя в этом нет уверенности ... это разбудит потребителей при любых изменениях, даже если изменение произошло от потребителя, который что-то стряхнул (возможно, оставив пустым). Возможно, проверки этого в функции наблюдателя достаточно. Я вижу еще одну проблему: если все потребители заняты, то что происходит, когда производитель добавляет что-то новое в очередь? Наблюдаемое событие попадает в очередь какого-либо агента-пользователя или оно исчезает?
  4. см. Выше
  5. Я действительно не знаю, как это сделать. Я слышал, что применение clojure может быть полезным, но я не смог найти достаточно документов о том, как его использовать, и мое первоначальное тестирование, похоже, не сработало (извините, у меня больше нет кода)

1 Ответ

24 голосов
/ 03 мая 2010

Вот мое мнение. Я хотел использовать только структуры данных Clojure, чтобы увидеть, как это сработает. Обратите внимание, что было бы совершенно обычным и идиоматичным взять очередь блокировки из панели инструментов Java и использовать ее здесь; я думаю, код будет легко адаптировать. Обновление: Я действительно адаптировал его к java.util.concurrent.LinkedBlockingQueue, см. Ниже.

clojure.lang.PersistentQueue

Позвоните (pro-con), чтобы начать тестовый прогон; затем посмотрите на содержимое output, чтобы увидеть, что-нибудь произошло, и queue-lengths, чтобы увидеть, остались ли они в пределах заданной границы.

Обновление: Чтобы объяснить, почему я почувствовал необходимость использовать ensure ниже (меня об этом спрашивали в IRC), это должно предотвратить перекос записи (см. Статью в Википедии о Снимок экрана) изоляция для определения). Если бы я заменил @queue на (ensure queue), два или более производителей могли бы проверить длину очереди, обнаружить, что она меньше 4, затем поместить дополнительные элементы в очередь и, возможно, вывести общую длину очередь выше 4, нарушая ограничение. Аналогично, два потребителя, выполняющие @queue, могут принять один и тот же элемент для обработки, а затем вытолкнуть два элемента из очереди. ensure предотвращает любой из этих сценариев.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

Версия выше написана с использованием LinkedBlockingQueue. Обратите внимание, что общая схема кода в основном одинакова, а некоторые детали на самом деле немного чище. Я удалил queue-lengths из этой версии, так как LBQ позаботился об этом ограничении для нас.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...