core.asyn c channel - отслеживание того, что происходит, когда (пример) - PullRequest
0 голосов
/ 06 февраля 2020

Я в гл. 6 из 7 моделей параллелизма Пола Мясника за 7 недель , которые фокусируются на core.async.

У нас есть следующая функция

(defn map-chan [f from]                                                         
  (let [to (chan)]
    (go-loop []
      (when-let [x (<! from)]    
        (>! to (f x))            
        (println "parking channel write.") 
        (recur))                         
      (close! to))
    (println "map-chan done.")
    to)) 

Я добавил printlns Я сам, чтобы выяснить точный порядок вычислений, о котором я хочу спросить здесь.

Мы можем запустить его так:

(def ch (to-chan (range 10)))             ; [1]
(def mapped (map-chan (partial * 2) ch))  ; [2]
(<!! (async/into [] mapped))              ; [3]

;; [1] Create & rtn a channel from els of seq, closing it when seq fin.
;; [2] map-chan returns immediately, with blocked go blocks inside of it.
;; [3] calling async/into finally triggers the parked channel writes, as seen below.

в ответе:

channels.core=> (def ch (to-chan (range 10)))
#'channels.core/ch
channels.core=> (def mapped (map-chan (partial * 2) ch))
map-chan done.
#'channels.core/mapped
channels.core=> (<!! (async/into [] mapped))
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
[0 2 4 6 8 10 12 14 16 18]
channels.core=> 

Вопрос

У нас есть (syn c) (то есть небуферизованный) канал, который имеет и писатель, и читатель, готовый к go. Почему моя "запись канала парковки" выше не запускается до тех пор, пока не будет вызван async/into? (Это не канал, прочитанный с помощью <!!, который запускает его, а сам async/into - это легко проверить). Я не жалуюсь на это, просто пытаюсь понять, почему след таков. Действительно ли каналы ленивы? Он еще не упомянул об этом в книге.

Обратите внимание, что зависимость от этого кода равна org.clojure/core.async "0.1.267.0-0d7780-alpha", если это имеет какое-то значение.

Кроме того, в книге он использовал буферизованный канал длины 10. Тем не менее, я также попробовал его с небуферизованным (syn c) каналом, и результат кажется тем же.

1 Ответ

1 голос
/ 06 февраля 2020

Ваш выходной канал to имеет нулевой размер, поэтому запись не может быть произведена, пока не будет запрошен соответствующий дубль. Посмотрите на модифицированную версию вашего кода:

(ns tst.demo.core
  (:use tupelo.core tupelo.test )
  (:require
    [clojure.core.async :as async]
    ))

(defn map-chan [f from]
  (let [to (async/chan)]
    (async/go
      (loop []
        (when-let [x (async/<! from)]
          (println "put - pre")
          (async/>! to (f x))
          (println "put - post")
          (recur)))
      (async/close! to))
    (println "map-chan returns output buffer")
    to))

(dotest
(println :1)
(spyx
  (def ch (async/to-chan (range 10)))) ; [1]

(Thread/sleep 2000) (println :2)
(spyx
  (def mapped (map-chan (partial * 2) ch))) ; [2]

(Thread/sleep 2000) (println :3)
(spyx
  (async/<!! (async/into [] mapped))) ; [3]
  )

с результатами:

-------------------------------
   Clojure 1.10.1    Java 13
-------------------------------

lein test tst.demo.core
:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
:3
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]

Итак, go l oop действительно запускается сразу, но первая операция put блокируется до тех пор, пока async/into на шаге [3] не произойдет.

Если мы используем буферный выходной канал длиной 20, мы увидим, что go l oop работает до того, как произойдет шаг [3]:

...
(let [to (async/chan 20)]
   ...

с результатом:

:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
:3
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...