Синхронизация потоков с Clojure - PullRequest
0 голосов
/ 12 сентября 2018

Я получил упражнение:

  • Печатайте по порядку все натуральные числа от 1 до 100.

  • Используя блоки, семафоры или другие подобные механизмы (но избегая спящего режима), координируйте два потока так, чтобы объединенный вывод из обоих потоков отображался в числовом порядке.

     Sample Output
        In thread one: The number is ‘1’
        In thread two: The number is ‘2’
        In thread one: The number is ‘3’
        In thread two: The number is ‘4’
    

Это упражнение предназначено для Ruby, но я хотел бы показать своему классу, что Clojure может быть хорошим вариантом для этой задачи.

У меня нет опыта работы с потоками на любом языке, но я подумал использовать что-то вроде:

 (def thread_1 (future (swap! my-atom inc) ))
 (def thread_2 (future (swap! my-atom inc) ))

но @ thread_1 всегда возвращает одно и то же значение. Есть ли способ согласования двух потоков в Clojure?

Я нашел этот пример в Java, используя ReentrantLock и Condition, и теперь я пытаюсь перевести его на Clojure.

Ответы [ 3 ]

0 голосов
/ 13 сентября 2018

Вот способ, как правило, для координации нескольких потоков, работающих в одном и том же состоянии, с помощью агента :

(def my-agent (agent 1 :validator #(<= % 100)))
(future
  (while true
    (send my-agent
          (fn [i]
            (println "In thread" (.getName (Thread/currentThread))
                     "the number is:" i)
            (inc i)))))
In thread clojure-agent-send-pool-4 the number is: 1
In thread clojure-agent-send-pool-5 the number is: 2
In thread clojure-agent-send-pool-5 the number is: 3
In thread clojure-agent-send-pool-4 the number is: 4
In thread clojure-agent-send-pool-4 the number is: 5
In thread clojure-agent-send-pool-4 the number is: 6

Вы можете увидеть то же самое важное поведение здесь с * 1006 или без него* потому что send немедленно возвращается во внутренний цикл, и функция sent может выполняться в разных потоках в пуле.Агент заботится о координации доступа к общему состоянию.

Обновление: вот еще один способ сделать то же самое, что не включает в себя функцию :validator или прерывание по исключению:

(def my-agent (agent (range 1 101)))
(while (seq @my-agent)
  (send
    my-agent
    (fn [[n & ns]]
      (when n
        (println "In thread" (.getName (Thread/currentThread))
                 "the number is:" n)
        ns))))
0 голосов
/ 13 сентября 2018

Если порядок потоков имеет значение, и если вам интересно узнать о неклассическом обмене потоками, вы можете использовать clojure.core.async и использовать «каналы».

(require '[clojure.core.async :as a])

(let [chan-one (a/chan 1)
      chan-two (a/chan 1)]
  (a/>!! chan-one 1)
  (doseq [[thread in out] [["one" chan-one chan-two]
                           ["two" chan-two chan-one]]]
    (a/go-loop []
      (when-let [n (a/<! in)]
        (if (> n 10)
          (do (a/close! in)
              (a/close! out))
          (do (prn (format "In thread %s: The number is `%s`" thread n))
              (a/>! out (inc n))
              (recur)))))))

Выходные данные

"In thread one: The number is `1`"
"In thread two: The number is `2`"
"In thread one: The number is `3`"
"In thread two: The number is `4`"
"In thread one: The number is `5`"
"In thread two: The number is `6`"
"In thread one: The number is `7`"
"In thread two: The number is `8`"
"In thread one: The number is `9`"
"In thread two: The number is `10`"

Один из моментов здесь заключается в том, что подпрограммы go выполняются в пуле потоков, поэтому они не являются выделенными потоками, и если вы хотите использовать реальные потоки, вы должны пойти следующим образом:

(require '[clojure.core.async :as a])

(let [chan-one (a/chan 1)
      chan-two (a/chan 1)]
  (a/>!! chan-one 1)
  (doseq [[thread in out] [["one" chan-one chan-two]
                           ["two" chan-two chan-one]]]
    (a/thread
      (loop []
        (when-let [n (a/<!! in)]
          (if (> n 10)
            (do (a/close! in)
                (a/close! out))
            (do (prn (format "In thread %s: The number is `%s`" thread n))
                (a/>!! out (inc n))
                (recur))))))))
0 голосов
/ 12 сентября 2018

Во-первых, причина, по которой вы видите странные результаты, заключается в том, что вы разыменовываете future, а не атом, содержащий число. Будущее вернет результат swap! при разыменовании.

Во-вторых, вы можете использовать locking (в основном Java synchronized), чтобы разрешить увеличение и печать только одного потока за один раз:

(def n-atom (atom 0))

(defn action []
  ; Arbitrarily chose the atom to lock on.
  ; It would probably be better to create a private object that can't be otherwise used.
  (locking n-atom
    (println
      (swap! n-atom inc))))

(defn go []
  ; Have each thread do action twice for a total of four times
  (future (doall (repeatedly 2 action)))
  (future (doall (repeatedly 2 action))))

(go)
1
2
3
4

Замечу, однако, что future здесь действительно не следует использовать. future для случаев, когда вы хотите вычислить результат асинхронно. Он проглатывает ошибки до тех пор, пока не разыменуется, поэтому, если вы никогда не @, вы никогда не увидите исключений, которые появляются внутри future. Было бы лучше использовать пул потоков или, с помощью макроса, для простоты использования, запустить два потока самостоятельно:

(defmacro thread
  "Starts the body in a new thread."
  [& body]
  `(doto (Thread. ^Runnable (fn [] ~@body))
         (.start)))

(defn go []
  (thread (doall (repeatedly 2 action)))
  (thread (doall (repeatedly 2 action))))
...