Я пытаюсь начать работу с Onyx , платформой распределенных вычислений в Clojure.В частности, я пытаюсь понять, как агрегировать данные.Если я правильно понимаю документацию, комбинация окна и :trigger/emit
функции должна позволить мне сделать это.
Итак, я изменил пример агрегации (Onyx 0.13.0) тремя способами (см. Гист с полным кодом ):
- in
-main
I println
любые сегменты, помещенные в выходной канал;это работает, как и ожидалось, с исходным кодом, так как он собирает все сегменты и печатает их на стандартный вывод. Я добавляю функцию emit следующим образом:
(defn make-ds
[event window trigger {:keys [lower-bound upper-bound event-type] :as state-event} extent-state]
(println "make-ds called")
{:ds window})
Я добавляю конфигурацию триггера (исходный dump-words
триггер выдан для краткости):
(def triggers
[{:trigger/window-id :word-counter
:trigger/id :make-ds
:trigger/on :onyx.triggers/segment
:trigger/fire-all-extents? true
:trigger/threshold [5 :elements]
:trigger/emit ::make-ds}])
Я изменяю задачу :count-words
с вызова функции identity
натип reduce
, чтобы он не передавал все входные сегменты на выход (и добавил опции конфигурации, которые оникс должен обрабатывать как пакет):
{:onyx/name :count-words
;:onyx/fn :clojure.core/identity
:onyx/type :reduce ; :function
:onyx/group-by-key :word
:onyx/flux-policy :kill
:onyx/min-peers 1
:onyx/max-peers 1
:onyx/batch-size 1000
:onyx/batch-fn? true}
Когда я запускаю это сейчас, я вижу в выходных данных, что функция emit (то есть make-ds
) вызывается для каждого входного сегмента (первый выход поступает из триггера dump-words
исходного кода):
> lein run
[....]
Om -> 1
name -> 1
My -> 2
a -> 1
gone -> 1
Coffee -> 1
to -> 1
get -> 1
Time -> 1
make-ds called
make-ds called
make-ds called
make-ds called
[....]
Однако сборка сегмента от make-ds не доходит до выходного канала, они никогда не печатаются.Если я верну задачу :count-words
в функцию identity
, это будет работать просто отлично.Кроме того, это выглядит так, как будто функция emit вызывается для каждого входного сегмента, тогда как я ожидаю, что она будет вызываться только тогда, когда пороговое условие истинно (т. Е. Всякий раз, когда 5 элементов были агрегированы в окне).
Поскольку тест на эту функциональность в базе кода Onyx (onyx.windowing.emit-aggregate-test
) проходит очень хорошо, я предполагаю, что где-то совершаю глупую ошибку, но затрудняюсь понять, что именно.