Вставка / вставка разъема приемника JDBC на основе максимальной отметки времени? - PullRequest
0 голосов
/ 10 октября 2018

Я очень новичок в Kafka connect

Я вставляю записи из нескольких источников в одну таблицу.В некоторых случаях возможно, что некоторые записи достигнут других.Поскольку я не могу контролировать, какой источник будет извлекать какую запись первым, я хочу добавить проверку ключа метки времени записи.

У меня в схеме есть ключ с именем «LastModified_timestamp», в котором я храню метку времени последнего состояния моей записи.

Я хочу добавить проверку в мой соединитель приемника JDBC, где я могуupsert запись, основанная на сравнении значения LastModified_timestamp

Я хочу игнорировать записи, которые имеют более старую отметку времени и хотят только сохранить / вставить самую последнюю.Я не смог найти никакой конфигурации для достижения этого

Есть ли способ, которым я могу достичь этого?Поможет ли написание пользовательского запроса в этом случае?

1 Ответ

0 голосов
/ 10 октября 2018

Разъем JDBC Sink не поддерживает такую ​​функцию.У вас есть два варианта:

  • Преобразование одного сообщения (SMT) - они применяют логику к записям, когда они проходят через Kafka Connect.SMT отлично подходит для таких вещей, как отбрасывание столбцов, изменение типов данных и т. Д. НО не подходит для более сложной обработки и логики, включая логику, которая должна охватывать несколько записей, как ваша здесь

  • Сначала обработайте данные в теме исходного Кафки, чтобы применить необходимую логику.Это можно сделать с помощью Kafka Streams, KSQL и некоторых других сред обработки потоков (например, Spark, Flink и т. Д.).Вам понадобится какая-то логика с сохранением состояния, которая может сработать, если запись будет старше, чем уже обработанная.

Можете ли вы рассказать подробнее о вашем источнике данных для восходящего потока?Возможно, есть лучший способ организовать поступающие данные для обеспечения порядка.

Последняя идея заключается в том, чтобы поместить все записи в целевую БД, а затем использовать логику в запросе к базе данных, используя ее для выбора самой последней (на основе LastModified_timestamp) записи дляданный ключ

Отказ от ответственности: я работаю в Confluent, компании, работающей над проектом с открытым исходным кодом KSQL.

...