У меня есть поток сегментов Оникса, которые являются сообщениями с отметкой времени (поступающими в хронологическом порядке). Скажем, они выглядят так:
{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:id 2 :timestamp "2018-09-04 21:32:03" :msg "Lorem ipsum"}
{:id 3 :timestamp "2018-09-05 03:01:52" :msg "Dolor sit amet"}
{:id 4 :timestamp "2018-09-05 09:28:16" :msg "Consetetur sadipscing"}
{:id 5 :timestamp "2018-09-05 12:45:33" :msg "Elitr sed diam"}
{:id 6 :timestamp "2018-09-06 08:14:29" :msg "Nonumy eirmod"}
...
Для каждого временного окна (одного дня) в данных я хочу выполнить вычисления на множестве всех его сегментов. Т.е. в этом примере я бы хотел работать с сегментами с идентификаторами 1 и 2 (для 4 сентября), затем с идентификаторами 3, 4 и 5 (для 5 сентября) и т. Д.
Onyx предлагает окна и триггеры, и они должны делать то, что я хочу, из коробки. Если я использую окно :window/type :fixed
и агрегирую по :window/range [1 :day]
относительно :window/window-key :timestamp
, я буду агрегировать все сегменты каждого дня.
Чтобы запускать мои вычисления только по прибытии всех сегментов дня, Onyx предлагает триггерное поведение :onyx.triggers/watermark
. Согласно документации , она должна выстрелить
, если значение :window/window-key
в сегменте превышает верхнюю границу в пределах активного окна
Однако триггер не срабатывает , хотя я вижу, что более поздние сегменты уже поступают, и несколько окон должны быть заполнены. В качестве проверки работоспособности я попробовал простой :onyx.triggers/segment
триггер, который работал как положено.
Моя неудачная попытка создать минимальный пример:
Я изменил фиксированные окна игрушечное задание , чтобы проверить срабатывание водяного знака, и он работал там .
Однако Я обнаружил , что в этой игрушечной работе причина срабатывания триггера водяного знака может быть:
Закрыл ли входной канал? Возможно, только что выполненная работа может вызвать срабатывание водяного знака.
Другим аспектом, который взаимодействует с запуском водяных знаков, является распределенная работа над задачами по пирам .
Комментарии к выпуску # 839 (:trigger/emit
, не работающему с :onyx.triggers/watermark
) в репозитории Onyx, указали мне на выпуск # 840 ( Водяной знак не работает с темой Kafka, имеющей> 1 раздел ), где я нашел эту подсказку (выделение мое):
Проблема в том, что все ваши данные заканчиваются на одном разделе, а водяные знаки всегда принимают минимальный водяной знак для всех входных данных peers (и при использовании нативного водяные знаки kafka, минимальный водяной знак для данного пира).
Когда вы вызываете g / send с небольшими объемами данных и автоматическим назначением раздела, все ваши данные оказываются в одном разделе, что означает, что партнер другого раздела продолжает испускать водяной знак 0 .
Я обнаружил , что:
Невозможно использовать его с текущим триггером водяного знака, который зависит от источника входного сигнала. Вы можете попытаться извлечь предыдущую реализацию водяного знака [...]
В моем графике задач, однако, сегменты Я хочу агрегировать в Windows, только созданы в некоторой промежуточной задаче , они не происходят из ввода задача как таковая. Сегменты ввода предоставляют только информацию о том, как создавать / извлекать содержимое сегментов для этой промежуточной задачи.
Опять же, эта конструкция прекрасно работает в вышеупомянутом игрушечном задании . Причина в том, что входной канал в какой-то момент закрывается, что завершает работу, что, в свою очередь, вызывает появление водяного знака. Так что мой игрушечный пример на самом деле не очень хорошая модель, потому что это не открытый поток.
Если задание получает рассматриваемые сегменты из фактического источника ввода, но без временных отметок, Onyx, похоже, предоставляет пространство для указания assign-watermark-fn
, который является необязательным атрибутом задачи input . Эта функция устанавливает водяной знак при каждом прибытии нового сегмента. В моем случае это не помогает, поскольку сегменты не происходят из задачи ввода.