Я экспериментирую с core.async
на Clojure и ClojureScript, чтобы попытаться понять, как работает merge
.В частности, делает ли merge
доступными какие-либо значения, введенные для входных каналов, для немедленного приема на объединенном канале.
У меня есть следующий код:
(ns async-merge-example.core
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
[async-merge-example.exec :as exec]))
(defn async-fn-timeout
[v]
(async/go
(async/<! (async/timeout (rand-int 5000)))
v))
(defn async-fn-exec
[v]
(exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))
(defn merge-and-print-results
[seq async-fn]
(let [chans (async/merge (map async-fn seq))]
(async/go
(while (when-let [v (async/<! chans)]
(prn v)
v)))))
Когда я пытаюсь async-fn-timeout
с большим значением seq
:
(merge-and-print-results (range 20) async-fn-timeout)
Для Clojure и ClojureScript, я получаю ожидаемый результат, так как в результате результаты начинают печататься довольносразу же, с ожидаемыми задержками.
Однако, когда я пытаюсь async-fn-exec
с тем же seq
:
(merge-and-print-results (range 20) async-fn-exec)
Для ClojureScript я получаю ожидаемый результат, как врезультаты начинают печататься практически сразу, с ожидаемыми задержками.Однако для Clojure, даже несмотря на то, что процессы sh
выполняются одновременно (в зависимости от размера пула потоков core.async
), результаты, по-видимому, первоначально задерживаются, а затем, в основном, печатаются одновременно!Я могу сделать эту разницу более очевидной, увеличив размер последовательности, например (range 40)
Поскольку результаты для async-fn-timeout
такие же, как и для Clojure, и для ClojureScript, палец указывает на различия между Clojureи реализация ClojureScript для exec
..
Но я не знаю, почему эта разница может вызвать эту проблему?
Примечания:
- Эти наблюдения были сделаныв WSL в Windows 10
- Исходный код
async-merge-example.exec
ниже - В
exec
реализация отличается для Clojure и ClojureScript из-за различий между Clojure / Java и ClojureScript / NodeJS.
(ns async-merge-example.exec
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))
; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c
; clj implementation based on https://stackoverflow.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure
#?(:cljs (def spawn (.-spawn (js/require "child_process"))))
#?(:cljs
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
(.on (.-stdout p) "data" #(async/put! c [:out (str %)]))
(.on (.-stderr p) "data" #(async/put! c [:err (str %)]))
(.on p "close" #(async/put! c [:exit (str %)]))
c)))
#?(:clj
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan)]
(async/go
(let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
process (.start builder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))
err-reader (clojure.java.io/reader (.getErrorStream process))]
(loop []
(let [line (.readLine ^java.io.BufferedReader reader)
err (.readLine ^java.io.BufferedReader err-reader)]
(if (or line err)
(do (when line (async/>! c [:out line]))
(when err (async/>! c [:err err]))
(recur))
(do
(.waitFor process)
(async/>! c [:exit (.exitValue process)]))))))))
c)))
(defn exec
"executes cmd with args. returns a channel immediately which
will eventually receive a result map of
{:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
[cmd & args]
(let [c (exec-chan cmd args)]
(async/go (loop [output (async/<! c) result {}]
(if (= :exit (first output))
(assoc result :exit (second output))
(recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))