Мне трудно понять то, что я считал довольно простой концепцией в асинхронной библиотеке Clojure. По сути, я создаю два канала с конвейером, где выходной канал создается с использованием функции take входного канала.
Насколько я понимаю, цель выполнения состоит в том, чтобы ограничить количество элементов, которые канал получит до того, как закроется сам (если входной канал еще не закрыт к этому времени). Однако примеры кода, с которыми я играл, не дают ожидаемых результатов.
Возьмите следующий код, например:
(def in (chan 1))
(def out (async/take 5 in 1))
(doseq [i (range 10)]
(go (>! in i)))
(pipeline 4 out (filter even?) in)
(go-loop []
(when-some [val (<! out)]
(println val)
(recur))))
Я ожидал, что конвейер будет отфильтровывать нечетные числа и только передавать четные числа в выходной канал, когда выходной канал получит 5 четных чисел, которые он закроет. Однако то, что я увидел, было как нечетные, так и четные числа, напечатанные в REPL, что-то вроде следующего:
2
7
4
0
8
6 * * +1010
В этот момент выходной канал все еще не закрылся, и при запуске доза q во второй раз выведет другое значение, прежде чем окончательно закроется.
Я невероятно озадачен тем, что здесь происходит, это работает как талисман при использовании take, а не конвейера, и также работает, когда не используется take, но все еще используется конвейер, использование двух в комбинации - это совсем другое история кажется. Я что-то упускаю здесь очевидное? Извиняюсь, если это простая ошибка, это моя первая (хотя и наивная) попытка использования core.async.