Flink - обработка последовательных событий в течение времени - PullRequest
0 голосов
/ 22 февраля 2019

У меня есть вариант использования, и я думаю, что мне нужна помощь, как к нему подойти.Поскольку я новичок в потоковой передаче и Flink, я постараюсь быть очень информативным в том, чего я пытаюсь достичь.Извините, если я не использую формальный и правильный язык.

Мой код будет на Java, но мне все равно, чтобы получить код на python или просто псевдокод или подход.

TL: DR

  1. Группировать события одного и того же ключа, которые находятся в пределах определенного времени.
  2. Из этих событий создайте результирующее событие только из 2 самых ближайших (временная область) событий.
  3. Это требует (я думаю) открытия окна для каждого и каждого наступающего события.
  4. Если вы посмотрите вперед на пакетное решение, вы лучше поймете мою проблему.

Справочная информация:

  1. IЯ получаю данные от датчиков в виде потока от Kafka.
  2. Мне нужно использовать eventTime, потому что эти данные не записаны.Время, которое даст мне 90% событий, составляет около 1 минуты.
  3. Я группирую эти события по некоторому ключу.

Что я хочу сделать:

  1. В зависимости от полей некоторых событий - я хотел бы «объединить / смешать» 2 события в новое событие («событие результата»).
  2. Первое условие заключается в том, что этипоследовательные события находятся в пределах 30 секунд друг от друга.
  3. Следующие условия просто проверяют значения некоторых полей и принимают решение.

Мое решение psuedo:

  1. открыть новое окно для КАЖДОГО события.Это окно должно составлять 1 минуту.
  2. Для каждого события, которое происходит в течение этой минуты - я хочу проверить его время и посмотреть, не прошло ли оно 30 секунд с исходного события окна.Если да - проверьте другое условие и пропустите новый поток результатов.

Проблема - Когда приходит новое событие, ему необходимо:

  1. создать новое окно для себя.
  2. Присоединиться только к ОДНОМУ окну из НЕСКОЛЬКО возможных окон, которые находятся в 30 секундах от него.

Вопрос:

Возможно ли это?

Другими словами, моя связь только между двумя "последовательными" событиями.

Большое спасибо.

Возможно, показывает решение для ** BATCH case покажет, что я пытаюсь сделать лучше всего: **

for i in range(grouped_events.length):
    event_A = grouped_events[i]
    event_B = grouped_events[i+1]
    if event_B.get("time") - event_A.get("time") < 30:
        if event_B.get("color") == event_A.get("color"):
            if event_B.get("size") > event_A.get("size"):
                create_result_event(event_A, event_B)

Мои (наивные) попытки до сих пор с Flink в Java

** СуммаФункция - это просто заполнитель для моей функции по созданию нового объекта результата ...

  1. Первое решение - это просто сделать простое временное окно и суммировать его по некоторому полю
  2. Во-вторых, пытается сделать некоторую функцию процесса в окне и, может быть, выполнить итерацию всех событий и проверить мои условия?

    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .sum("size")
    .print();
    
    
    
    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new processFunction());
    
    
    
    public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> {
        @Override
        public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
            Float sumOfSize = 0F;
            for (ThreeEvent f : threeEvents) {
                sumOfSize += f.getSize();
            }
    
            out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
        }
    }
    

1 Ответ

0 голосов
/ 23 февраля 2019

Конечно, вы можете использовать окна для создания мини-пакетов, которые вы сортируете и анализируете, но будет трудно правильно обрабатывать границы окна (что делать, если события, которые должны быть спарены, попадают в разные окна?).

Похоже, это было бы намного проще сделать с помощью потока с ключами и плоской карты с состоянием.Просто используйте RichFlatMapFunction и используйте один элемент состояния ключа (ValueState), который запоминает предыдущее событие для каждого ключа.Затем, когда каждое событие обрабатывается, сравните его с сохраненным событием, создайте результат, если это должно произойти, и обновите состояние.

О работе с состоянием клавиш flink можно прочитать в flink training и в документации flink .

Единственное, что меня беспокоит в вашем случае использования, это то, могут ли ваши события поступить не по порядку.Это тот случай, когда для получения правильных результатов вам нужно сначала отсортировать события по метке времени?Это не тривиально.Если это вызывает озабоченность, я бы предложил использовать Flink SQL с MATCH_RECOGNIZE или библиотеку CEP , обе из которых предназначены для распознавания образов в потоках событий, и будутпозаботьтесь о сортировке потока для вас (вам просто нужно указать временные метки и водяные знаки).

Этот запрос может быть не совсем правильным, но, надеюсь, даст представление о том, как сделать что-то подобное с распознаванием совпадений:

SELECT * FROM Events
MATCH_RECOGNIZE (
  PARTITION BY userId
  ORDER BY eventTime
  MEASURES
    A.userId as userId,
    A.color as color,
    A.size as aSize,
    B.size as bSize
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (A B)
  DEFINE
    A AS true,
    B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30) 
           AND A.color = B.color 
           AND A.size < B.size )
);

Это также может быть сделано вполне естественно с помощью CEP, где основой для сравнения последовательных событий является использование итерационного условия , и вы можете использовать предложение within для обработкиограничение по времени.

...