Понимание слияния core.async в Clojure против ClojureScript - PullRequest
0 голосов
/ 01 марта 2019

Я экспериментирую с core.async на Clojure и ClojureScript, чтобы попытаться понять, как работает merge.В частности, делает ли merge доступными какие-либо значения, введенные для входных каналов, для немедленного приема на объединенном канале.

У меня есть следующий код:

(ns async-merge-example.core
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
   [async-merge-example.exec :as exec]))

(defn async-fn-timeout
  [v]
  (async/go
    (async/<! (async/timeout (rand-int 5000)))
    v))

(defn async-fn-exec
  [v]
  (exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))

(defn merge-and-print-results
  [seq async-fn]
  (let [chans (async/merge (map async-fn seq))]
    (async/go
      (while (when-let [v (async/<! chans)]
               (prn v)
               v)))))

Когда я пытаюсь async-fn-timeout с большим значением seq:

(merge-and-print-results (range 20) async-fn-timeout)

Для Clojure и ClojureScript, я получаю ожидаемый результат, так как в результате результаты начинают печататься довольносразу же, с ожидаемыми задержками.

Однако, когда я пытаюсь async-fn-exec с тем же seq:

(merge-and-print-results (range 20) async-fn-exec)

Для ClojureScript я получаю ожидаемый результат, как врезультаты начинают печататься практически сразу, с ожидаемыми задержками.Однако для Clojure, даже несмотря на то, что процессы sh выполняются одновременно (в зависимости от размера пула потоков core.async), результаты, по-видимому, первоначально задерживаются, а затем, в основном, печатаются одновременно!Я могу сделать эту разницу более очевидной, увеличив размер последовательности, например (range 40)

Поскольку результаты для async-fn-timeout такие же, как и для Clojure, и для ClojureScript, палец указывает на различия между Clojureи реализация ClojureScript для exec ..

Но я не знаю, почему эта разница может вызвать эту проблему?

Примечания:

  • Эти наблюдения были сделаныв WSL в Windows 10
  • Исходный код async-merge-example.exec ниже
  • В exec реализация отличается для Clojure и ClojureScript из-за различий между Clojure / Java и ClojureScript / NodeJS.
(ns async-merge-example.exec
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))

; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c

; clj implementation based on https://stackoverflow.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure

#?(:cljs (def spawn (.-spawn (js/require "child_process"))))

#?(:cljs
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
       (.on (.-stdout p) "data"  #(async/put! c [:out  (str %)]))
       (.on (.-stderr p) "data"  #(async/put! c [:err  (str %)]))
       (.on p            "close" #(async/put! c [:exit (str %)]))
       c)))

#?(:clj
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan)]
       (async/go
         (let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
               process (.start builder)]
           (with-open [reader (clojure.java.io/reader (.getInputStream process))
                       err-reader (clojure.java.io/reader (.getErrorStream process))]
             (loop []
               (let [line (.readLine ^java.io.BufferedReader reader)
                     err (.readLine ^java.io.BufferedReader err-reader)]
                 (if (or line err)
                   (do (when line (async/>! c [:out line]))
                       (when err (async/>! c [:err err]))
                       (recur))
                   (do
                     (.waitFor process)
                     (async/>! c [:exit (.exitValue process)]))))))))
       c)))

(defn exec
  "executes cmd with args. returns a channel immediately which
   will eventually receive a result map of 
   {:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
  [cmd & args]
  (let [c (exec-chan cmd args)]
    (async/go (loop [output (async/<! c) result {}]
                (if (= :exit (first output))
                  (assoc result :exit (second output))
                  (recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))

1 Ответ

0 голосов
/ 02 марта 2019

Ваша реализация Clojure использует блокировку ввода-вывода в одном потоке.Вы сначала читаете из stdout, а затем в цикле stderr.Оба выполняют блокировку readLine, поэтому они вернутся только после того, как фактически закончат чтение строки.Поэтому, если ваш процесс не создаст одинаковое количество выходных данных для stdout и stderr, один поток в конечном итоге заблокирует другой.

Как только процесс завершится, readLine больше не будет блокироваться и просто вернет nil один разбуфер пуст.Таким образом, цикл просто заканчивает чтение буферизованного вывода и, наконец, завершает объяснение сообщений «все сразу».

Возможно, вы захотите запустить второй поток, который занимается чтением из stderr.

node не блокирует ввод-вывод, поэтому по умолчанию все происходит асинхронно, а один поток не блокирует другой.

...