Конечно, вы можете использовать окна для создания мини-пакетов, которые вы сортируете и анализируете, но будет трудно правильно обрабатывать границы окна (что делать, если события, которые должны быть спарены, попадают в разные окна?).
Похоже, это было бы намного проще сделать с помощью потока с ключами и плоской карты с состоянием.Просто используйте RichFlatMapFunction и используйте один элемент состояния ключа (ValueState), который запоминает предыдущее событие для каждого ключа.Затем, когда каждое событие обрабатывается, сравните его с сохраненным событием, создайте результат, если это должно произойти, и обновите состояние.
О работе с состоянием клавиш flink можно прочитать в flink training и в документации flink .
Единственное, что меня беспокоит в вашем случае использования, это то, могут ли ваши события поступить не по порядку.Это тот случай, когда для получения правильных результатов вам нужно сначала отсортировать события по метке времени?Это не тривиально.Если это вызывает озабоченность, я бы предложил использовать Flink SQL с MATCH_RECOGNIZE или библиотеку CEP , обе из которых предназначены для распознавания образов в потоках событий, и будутпозаботьтесь о сортировке потока для вас (вам просто нужно указать временные метки и водяные знаки).
Этот запрос может быть не совсем правильным, но, надеюсь, даст представление о том, как сделать что-то подобное с распознаванием совпадений:
SELECT * FROM Events
MATCH_RECOGNIZE (
PARTITION BY userId
ORDER BY eventTime
MEASURES
A.userId as userId,
A.color as color,
A.size as aSize,
B.size as bSize
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B)
DEFINE
A AS true,
B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30)
AND A.color = B.color
AND A.size < B.size )
);
Это также может быть сделано вполне естественно с помощью CEP, где основой для сравнения последовательных событий является использование итерационного условия , и вы можете использовать предложение within
для обработкиограничение по времени.