Clojure Производитель Потребитель - PullRequest
1 голос
/ 19 января 2012

Я изучаю clojure и пробую его параллелизм и эффективность на примере производителя.

Сделал это, и было неудобно использовать ref и deref, а также смотреть и снимать часы.

Я пытался проверить другой фрагмент кода; но есть ли лучший способ пересмотреть это, кроме как использовать методы Java Condition await () и signal () вместе с Java Lock. Я не хотел ничего использовать в Java.

вот код; Я думаю, я бы сделал много ошибок здесь с моим использованием ...

;a simple producer class
(ns my.clojure.producer
    (:use  my.clojure.consumer)
  (:gen-class)
  )

 (def tasklist( ref (list) )) ;this is declared as a global variable; to make this 
    ;mutable we need to use the fn ref


(defn gettasklist[]
    (deref tasklist) ;we need to use deref fn to return the task list
  )

(def testagent (agent 0)); create an agent


(defn emptytasklist[akey aref old-val new-val]

   (doseq [item (gettasklist)]
        (println(str "item is") item)
        (send testagent consume item)
        (send testagent increment item)
     )
      (. java.lang.Thread sleep 1000)
   (dosync ; adding a transaction for this is needed to reset

      (remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
                                       ; go to a recursive call
      (ref-set tasklist (list ) ) ; we need to make it as a ref to reassign
      (println (str "The number of tasks now remaining is=")  (count (gettasklist)))

     )
  (add-watch tasklist "key123" emptytasklist)
 )
(add-watch tasklist "key123" emptytasklist)

  (defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset
      ;(println (str "The number of tasks before") (count (gettasklist)))
      (println (str "Adding a task") task)
      (ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
      ;(println (str "The number of tasks after") (count (gettasklist)))
     )
  )

Вот код потребителя

(ns my.clojure.consumer
  )
(defn consume[c item]

  (println  "In the consume method:Item is " c item  )
  item 
)
(defn increment [c n] 
  (println "parmeters are" c n)
  (+ c n)
  )

А вот тестовый код (я использовал maven для запуска кода clojure и использовал NetBeans для редактирования, так как это мне более знакомо, исходя из Java - структура папок и pom в - https://github.com/alexcpn/clojure-evolve

(ns my.clojure.Testproducer
        (:use my.clojure.producer)
        (:use clojure.test)
      (:gen-class)
  )

(deftest test-addandcheck

  (addtask 1)
  (addtask 2)
  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

Если кто-нибудь сможет слегка изменить этот факт, чтобы я мог прочитать и понять код, тогда это будет здорово; В остальном, я думаю, мне придется больше узнать

Редактировать -1

Я полагаю, использование глобального списка задач и предоставление его другим функциям путем разыменования его (deref) и повторного создания его изменяемым с помощью ref не является подходом в clojure;

Таким образом, изменив метод addTask для прямой отправки входящих задач агенту

(defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset

      (println (str "Adding a task") task)

        ;(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
       (def testagent (agent 0)); create an agent
       (send testagent consume task)
       (send testagent increment task)

     )

Однако, когда я проверял это

(deftest test-addandcheck
  (loop [task 0]
    (when ( < task 100)
        (addtask task)
      (recur (inc task))))

  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))

 )

через некоторое время я получаю исключение из-за отклонения выполнения Java - это нормально, если вы выполняете потоки Java, потому что вы получаете полный контроль. Но из clojure это выглядит странно, тем более что вы сами не выбираете стратегию ThreadPool

Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
        at clojure.lang.Agent$Action.execute(Agent.java:56)
        at clojure.lang.Agent$Action.doRun(Agent.java:95)
        at clojure.lang.Agent$Action.run(Agent.java:106)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87

Ответы [ 4 ]

4 голосов
/ 19 января 2012

Я думаю, что моделирование производителя и потребителя в ближайшем будущем будет сделано проще всего (и наиболее эффективно) с использованием каналов lamina .

1 голос
/ 19 января 2012

Я создал аналогичную программу clojure с шаблоном производителя / потребителя в Размеры вычисляемой папки асинхронно .

Я не думаю, что вам действительно нужно использовать ссылки.У вас есть один список задач, который является изменяемым состоянием, которое вы хотите изменить синхронно .Изменения относительно быстрые и не зависят от какого-либо другого внешнего состояния, только от переменной, которая поступает к потребителю.

Что касается атомов, то есть функция swap! , которая может помочь вам выполнитьменяет то, что, как я понял, вам нужно.

Вы можете посмотреть на мой фрагмент Размер вычислительной папки Я думаю, что это может по крайней мере показать вам правильное использование атомов и агентов.Я много играл с ним, так что должно быть правильно.

Надеюсь, это поможет!

С уважением, Ян

0 голосов
/ 25 мая 2013

Я сталкивался с несколькими случаями использования, когда мне нужна способность:

  • строго контролирует количество рабочих потоков как на стороне производителя, так и на стороне потребителя
  • контролирует максимальный размер "рабочей очереди", чтобы ограничить потребление памяти
  • определить, когда все работы завершены, чтобы я мог остановить рабочих

Я обнаружил, что встроенные функции параллелизма clojure (хотя они удивительно просты и полезны сами по себе), затрудняют первые два пункта. lamina выглядит великолепно, но я не нашел способа, чтобы он отвечал моим конкретным случаям использования без такого же рода дополнительных действий, которые я должен был бы выполнить для реализации, основанной на BlockingQueue.

Итак, я решил собрать простую библиотеку clojure, чтобы попытаться решить мои проблемы. По сути, это просто оболочка вокруг BlockingQueue, которая пытается скрыть некоторые конструкции Java и предоставить высокоуровневый API производителя-потребителя. Я еще не совсем доволен API; скорее всего, он будет развиваться немного дальше ... но он работает:

https://github.com/cprice-puppet/freemarket

Пример использования:

(def myproducer (producer producer-work-fn num-workers max-work))
(def myconsumer (consumer myproducer consumer-work-fn num-workers max-results))
(doseq [result (work-queue->seq (:result-queue myconsumer))]
  (println result))

Обратная связь / предложения / материалы будут приветствоваться!

0 голосов
/ 04 января 2013

Я смотрел примеры Clojure для шторма в Твиттере. Он просто использовал LinkedBlockingQueue . Это простой в использовании, одновременный и хорошо работает. Конечно, ему не хватает сексуальной привлекательности неизменного решения Clojure, но оно будет хорошо работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...