Вот мое мнение. Я хотел использовать только структуры данных Clojure, чтобы увидеть, как это сработает. Обратите внимание, что было бы совершенно обычным и идиоматичным взять очередь блокировки из панели инструментов Java и использовать ее здесь; я думаю, код будет легко адаптировать. Обновление: Я действительно адаптировал его к java.util.concurrent.LinkedBlockingQueue
, см. Ниже.
clojure.lang.PersistentQueue
Позвоните (pro-con)
, чтобы начать тестовый прогон; затем посмотрите на содержимое output
, чтобы увидеть, что-нибудь произошло, и queue-lengths
, чтобы увидеть, остались ли они в пределах заданной границы.
Обновление: Чтобы объяснить, почему я почувствовал необходимость использовать ensure
ниже (меня об этом спрашивали в IRC), это должно предотвратить перекос записи (см. Статью в Википедии о Снимок экрана) изоляция для определения). Если бы я заменил @queue
на (ensure queue)
, два или более производителей могли бы проверить длину очереди, обнаружить, что она меньше 4, затем поместить дополнительные элементы в очередь и, возможно, вывести общую длину очередь выше 4, нарушая ограничение. Аналогично, два потребителя, выполняющие @queue
, могут принять один и тот же элемент для обработки, а затем вытолкнуть два элемента из очереди. ensure
предотвращает любой из этих сценариев.
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))
(add-watch queue :queue-length-watch queue-length-watch)
(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))
(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don't look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
java.util.concurrent.LinkedBlockingQueue
Версия выше написана с использованием LinkedBlockingQueue
. Обратите внимание, что общая схема кода в основном одинакова, а некоторые детали на самом деле немного чище. Я удалил queue-lengths
из этой версии, так как LBQ
позаботился об этом ограничении для нас.
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))
(defn consumer []
(future
(while @go-on?
;; I'm using .poll on the next line so as not to block
;; indefinitely if we're done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there's a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))