Cursive: Clojure's * out *, разные Writers, сбрасывание и упорядочение несогласованности при многопоточности: что происходит? - PullRequest
0 голосов
/ 05 ноября 2018

tl; dr Почему Clojure создает отдельную Writer для потоков в newFixedThreadPool? Почему он может быть сброшен после завершения пула? Почему поведение можно воспроизвести только в Курсиве?

Предположим, у нас есть приложение, которое делает что-то в отдельных потоках и что-то записывает в stdout. Предположим, что после того, как мы все сделали, мы хотим напечатать окончательное сообщение.

Первое, с чем мы столкнемся, это то, что println Clojure, если задано несколько аргументов, будет производить чередующийся вывод. Это покрыто здесь .

Но, похоже, есть другая проблема. Если мы запустим что-то вроде этого:

(defn main []
  (let [pool (make-pool num-threads)]
    (print-multithreaded pool "Hello, world!")
    (shutdown-pool pool))
  (safe-println "All done, have a nice day."))

У нас иногда будет

Hello, world!
All done, have a nice day.

а иногда

All done, have a nice day.
Hello, world!

Может быть flush после каждой записи?

(defn safe-println [& more]
  (.write *out* (str (clojure.string/join " " more) "\n"))
  (.flush *out*))

Не работает. Что работает, так это использование явного взаимодействия Java поверх System.out, например:

(defn safe-println [& more]
  (let [writer (System/out)]
    (.println writer (str (clojure.string/join " " more)))
    (.flush writer)))

Создание writer a (PrintWriter. System/out) или (OutputStreamWriter. System/out) также работает.

Похоже, у нас разные *out* s в наших темах ... Действительно,

(def out *out*)
(defn safe-println [& more]
  (.write out (str (clojure.string/join " " more) "\n"))
  (.flush out))

работает.

Итак, вот вопрос: почему это происходит? С кусочками Java это имеет смысл: System.out является статическим окончанием, поэтому существует только один экземпляр для всех потоков, и все говорит с ним, поэтому все добавляется в один и тот же буфер. При печати в *out* Clojure основной поток и объединенные потоки имеют свои собственные *out* со своими собственными буферами (а для основного потока это PrintWriter, для объединенных - это общий OutputStreamWriter). Во-первых, я не понимаю, почему это так, и я не совсем понимаю, почему это приводит к неосторожному упорядочению: мы явно завершаем все наши потоки до , вызывая окончательный вывод, который должен вызывать неявный flush. Но даже если мы добавим явное flush, результат останется прежним.

Я мог бы упустить некоторые действительно очевидные детали здесь, и я был бы рад, если бы вы помогли мне. Если вы хотите увидеть весь воспроизводимый пример, который я не включил сюда из-за его длины, вот ссылка на суть: https://gist.github.com/trueneu/b8498aa259899a8fc979090fccf632de

РЕДАКТИРОВАТЬ: Первая версия Gist действительно работает, и вы должны повозиться с ним, чтобы сломать его, поэтому я отредактировал его, чтобы продемонстрировать "неправильное" поведение с самого начала.

Также, чтобы устранить недоразумения, вот скриншот из Cursive: https://ibb.co/jHqSL0

EDIT2: На это было указано в первоначальном вопросе, но я сделаю акцент. Понимание сути и механизма этого поведения - половина вопроса. Новый *out* создается не для каждого потока. Но, похоже, создается отдельный пул потоков. (Для этого вывода уменьшите num-threads до 1 и добавьте печать от (.toString *out*) до safe-println. Увеличение num-threads не приводит к появлению новых адресов объектов):

(main)
java.io.PrintWriter@1dcc77c6
All done, have a nice day.
=> nil
java.io.OutputStreamWriter@7104a76f
Hello, world!

EDIT3: изменено map с doseq после комментария @glts. Кроме того, при запуске из lein repl он всегда выдает правильный вывод, что еще больше смущает меня. Итак, как отметил Дэвид Аренас, похоже, что поведение зависит от обработки выходных данных. Тем не менее, вопросы все еще стоят.

EDIT4: Дэвид Аренас также проверил это в Cider и не может воспроизвести поведение. Похоже, что это как-то связано с реализацией обработки вывода в Nivel в Cursive.

1 Ответ

0 голосов
/ 06 ноября 2018

Clojure *out* не создает экземпляр для каждого потока (это также статический финал), но он использует OutputStreamWriter, который не имеет атомарных гарантий. Вам нужно будет синхронизировать потоки в буфере, поскольку вы пишете в один поток.

Если вы запустите свой код с помощью nrepl, вы увидите, что вы получаете «правильное» поведение. Это потому, что они повторно привязывают out к своему собственному устройству записи, которое использует буфер блокировки.

Сессия nrepl:

(defn- session-out
  "Returns a PrintWriter suitable for binding as *out* or *err*.  All of
   the content written to that PrintWriter will (when .flush-ed) be sent on the
   given transport in messages specifying the given session-id.
   `channel-type` should be :out or :err, as appropriate."
  [channel-type session-id transport]
  (let [buf (clojure.tools.nrepl.StdOutBuffer.)]
    (PrintWriter. (proxy [Writer] []
                    (close [] (.flush ^Writer this))
                    (write [& [x ^Integer off ^Integer len]]
                      (locking buf
                        (cond
                          (number? x) (.append buf (char x))
                          (not off) (.append buf x)
                          ; the CharSequence overload of append takes an *end* idx, not length!
                          (instance? CharSequence x) (.append buf ^CharSequence x (int off) (int (+ len off)))
                          :else (.append buf ^chars x off len))
                        (when (<= *out-limit* (.length buf))
                          (.flush ^Writer this))))
                    (flush []
                      (let [text (locking buf (let [text (str buf)]
                                                (.setLength buf 0)
                                                text))]
                        (when (pos? (count text))
                          (t/send (or (:transport *msg*) transport)
                                  (response-for *msg* :session session-id
                                                channel-type text))))))
                  true)))
...