Идиоматический подход к остановке производителей / потребителей? - PullRequest
0 голосов
/ 25 сентября 2018

В попытке получить хорошую практику параллельного программирования я пытаюсь реализовать шаблон «производитель / потребитель», используя библиотеку Clojure core.async.Все работает хорошо, но я хотел иметь возможность остановить как производителя, так и потребителя в какой-то момент времени.

Мой текущий код выглядит примерно так ...

(def c (a/chan 5))
(def alive (atom true))

(def producer (a/go-loop []
                (Thread/sleep 1000)
                (when @alive 
                  (a/>! c 1)
                  (recur))))

(def consumer (a/go-loop []
                (Thread/sleep 3000)
                (when @alive 
                  (println (a/<! c))
                  (recur))))
(do
  (reset! alive false)
  (a/<!! producer)
  (a/<!! consumer))

К сожалению, этоПохоже, что блок 'do' иногда блокируется на неопределенный срок.По сути, я хочу иметь возможность остановить продолжение обоих циклов и блокировать их до тех пор, пока оба цикла не завершатся.Код Thread / sleep предназначен для имитации выполнения какой-либо единицы работы.

Я подозреваю, что остановка производителя заставляет потребителя парковаться, отсюда и зависание, хотя я не уверен в альтернативном подходе, любые идеи

1 Ответ

0 голосов
/ 25 сентября 2018

Пожалуйста, см. ClojureDocs для получения подробной информации.Пример:

(let [c (chan 2) ]
  (>!! c 1)
  (>!! c 2)
  (close! c)
  (println (<!! c)) ; 1
  (println (<!! c)) ; 2
  ;; since we closed the channel this will return false(we can no longer add values)
  (>!! c 1))

Для вашей задачи что-то вроде:

  (let [c        (a/chan 5)
        producer (a/go-loop [cnt 0]
                   (Thread/sleep 1000)
                   (let [put-result (a/>! c cnt)]
                     (println "put: " cnt put-result)
                     (when put-result
                       (recur (inc cnt)))))

        consumer (a/go-loop []
                   (Thread/sleep 3000)
                   (let [result (a/<! c)]
                     (when result
                       (println "take: " result)
                       (recur))))]
    (Thread/sleep 5000)
    (println "closing chan...")
    (a/close! c))

с результатом

put:  0 true
put:  1 true
take:  0
put:  2 true
put:  3 true
closing chan...
put:  4 false
take:  1
take:  2
take:  3
...