Как использовать Apache Flink DataStream API для вывода потока пар событий? - PullRequest
0 голосов
/ 18 июня 2020

Обувь (событие) определяется как ее цвет и isLeft (Если обувь предназначена для левой ноги, то isLeft=true, иначе false).

Tuple2<String, Boolean> leftBlueShoe  = Tuple2.of("blue", true);
Tuple2<String, Boolean> rightBlueShoe = Tuple2.of("blue", false);
// unbounded stream of shoes is as follows
DataStream<Tuple2<String, Boolean>> streamOfShoes = ... 
// somthing like - env.fromElements(leftBlueShoe, rightRedShoe, leftGreenShoe, rightBlueShoe, ...);

Как формировать пара обуви того же цвета и ожидайте, что согласованные пары будут отправлены немедленно , несовпадающая обувь будет ждать своей пары до конца окна.

DataStream<Tuple5<String, Boolean, String, Boolean, String>> shoePairs = ...
// few events from shoePairs stream:
Tuple5<> shoePair   = Tuple5.of("blue", true, "blue", false, "pairFound");
Tuple5<> notShoePair= Tuple5.of("red", true, "red", false, "pairNotFound"); // Even if pair not found in window we tagged and kept in stream

Попытки подходов (игнорируйте это, чтобы избежать путаница):

  1. Разделением потоков на левое и правое и оконным соединением (Это требует затрат?)

  2. Работа с окнами в одном потоке с TumblingProcessingTimeWindow и настраиваемым логом соединения c в apply(). Это окно не запускается мгновенно, даже если все события объединены в пару .

1 Ответ

1 голос
/ 18 июня 2020

Одно из упражнений в обучении Flink посвящено поиску пар событий; по духу это похоже на то, о чем вы просите. См. Упражнение Поездки и проезд , в котором для создания пары используется RichCoFlatMapFunction.

Решение здесь предполагает, что идеальное сочетание всегда возможно, поэтому оно не касается случая непревзойденные пары. Но вы можете найти вариант здесь , который продвигает этот шаг дальше. В этом примере используются таймеры в CoProcessFunction для обнаружения несовпадающих пар.

Другие моменты:

Разделение потока на левый и правый подпотоки должно иметь незначительную стоимость.

Думаю, CoGroupFunction должно работать. Если вы попробовали это, и это не сработало, возможно, вы использовали временное оконное управление событиями, а последний водяной знак отсутствовал, что не позволяло закрыть окно.

Обновление:

Посмотрев в вашем коде я вижу проблему в реализации. Средство извлечения временных меток использует системные часы, а не метки времени в событиях. Это даст вам нечто похожее (но хуже) на использование времени обработки. Я говорю «хуже, чем время обработки», потому что вы позволяете событиям быть неупорядоченными, что увеличивает задержку и предотвращает закрытие окна до тех пор, пока не появится событие, выходящее за пределы конечной точки окна. Это означает, что последнее окно никогда не может быть запущено.

В качестве теста попробуйте переключить временные характеристики c на время обработки, удалите assignTimestampsAndWatermarks и посмотрите, правильно ли работает CoGroupFunction. Вы также можете использовать время приема, если вы удалите водяной знак и позволите Flink обрабатывать его (с временем обработки водяные знаки не имеют значения; со временем приема Flink делает водяные знаки за вас, если вы его не переопределите).

Если вы хотите использовать время события в своем приложении, используйте конечные источники в вашем тестировании. Когда конечные источники (например, чтение из файла или коллекции) достигают конца своего ввода, они отправляют очень большой водяной знак через задание, которое закрывает все открытые windows.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...