Clojure - Core.async Pipeline + принять путаницу - PullRequest
0 голосов
/ 09 сентября 2018

Мне трудно понять то, что я считал довольно простой концепцией в асинхронной библиотеке Clojure. По сути, я создаю два канала с конвейером, где выходной канал создается с использованием функции take входного канала.

Насколько я понимаю, цель выполнения состоит в том, чтобы ограничить количество элементов, которые канал получит до того, как закроется сам (если входной канал еще не закрыт к этому времени). Однако примеры кода, с которыми я играл, не дают ожидаемых результатов.

Возьмите следующий код, например:

(def in (chan 1))
(def out (async/take 5 in 1))

(doseq [i (range 10)]
  (go (>! in i)))

(pipeline 4 out (filter even?) in)

(go-loop []
  (when-some [val (<! out)]
    (println val)
    (recur))))

Я ожидал, что конвейер будет отфильтровывать нечетные числа и только передавать четные числа в выходной канал, когда выходной канал получит 5 четных чисел, которые он закроет. Однако то, что я увидел, было как нечетные, так и четные числа, напечатанные в REPL, что-то вроде следующего:

2 7 4 0 8 6 * * +1010

В этот момент выходной канал все еще не закрылся, и при запуске доза q во второй раз выведет другое значение, прежде чем окончательно закроется.

Я невероятно озадачен тем, что здесь происходит, это работает как талисман при использовании take, а не конвейера, и также работает, когда не используется take, но все еще используется конвейер, использование двух в комбинации - это совсем другое история кажется. Я что-то упускаю здесь очевидное? Извиняюсь, если это простая ошибка, это моя первая (хотя и наивная) попытка использования core.async.

1 Ответ

0 голосов
/ 09 сентября 2018

Вы выставили take и pipeline на соревнование. Оба они берут предметы из in и добавляют их в out. Заменить определение out:

(def out (async/chan 3))

например, и получите ожидаемый результат Завершено в 21: 24: 14,403 (время выполнения: 0,063 с)

0
2
4
6
8

Если вы действительно хотите использовать async/take, вы можете сделать это так:

(def first (async/chan 1))
(def second (async/chan 3))
(pipeline 4 second (filter even?) first)
(def third (async/take 3 second))

(defn run []
  (go
    (doseq [i (range 10)]
      (>! first i)))
  (go (loop []
        (when-some [val (<! third)]
          (println val)
          (recur)))))

с результатом:

0
2
4
...