Используйте окно сеанса в потоке Kafka, чтобы упорядочить записи и вставить в базу данных MySQL - PullRequest
0 голосов
/ 07 мая 2020

Согласно документации KSQLDB, окно сеанса можно использовать для упорядочивания записей по метке времени и выполнения агрегирования. У меня есть вариант использования, когда я хочу последовательно вставлять записи в MySQL. У меня есть поле отметки времени в моей записи, которое я использовал как ROWTIME, а затем попробовал окно сеанса поверх него и вставил в выходной поток, который будет pu sh в topi c, а затем в RDS. Но в выходном потоке мне не удалось изменить порядок сообщений в соответствии с меткой времени. Пример. Есть две записи - запись 1 в 11:00 и запись 2 в 11:01, и обе имеют одинаковые первичные ключи. Эти две записи поступают в Kafka последовательно - запись 2, запись 1. Но в MYSQL мне нужна запись 1, а затем запись 2, так как запись 1 имеет более низкую временную метку. Я попробовал оконную сессию 5 минут в потоке. Но в потоке вывода она всегда идет как Запись 2, Запись 1.

Возможен ли этот сценарий внутри Kafka? Могу ли я переупорядочить записи внутри Kafka, а затем pu sh в поток с помощью оператора INSERT INTO?

В настоящее время я пытаюсь использовать запросы K SQL, поскольку я использую конфлюентный Kafka.

1 Ответ

1 голос
/ 02 июня 2020

Сессия windows не изменяет порядок записей, они группируют записи вместе, которые имеют один и тот же ключ и находятся в пределах некоторого периода времени друг от друга.

Следовательно, сеанс windows не будет разрешать вы можете изменить порядок сообщений.

Изменение порядка сообщений не является вариантом использования, для которого в настоящее время подходит ksqlDB. Возможно, вам повезет больше, если вы попытаетесь написать приложение на основе Kafka Streams.

Kafka Streams позволит вам использовать хранилище состояний для буферизации входных сообщений в течение некоторого времени, чтобы учесть сообщения не по порядку. У вас должна быть возможность использовать знаки препинания для запуска вывода кэшированных сообщений через некоторый период времени. Вам нужно будет выбрать, как долго вы хотите буферизовать ввод, чтобы разрешить сообщения о нарушении порядка.

...