Я нахожусь в процессе изучения платформы Confluent (Kafka, Ksql и т. Д.).Я передаю данные в тему Kafka, используя Debezium с Kafka Connect.Одно из полей в моей таблице базы данных «log» называется «register», которое является отметкой времени добавления записи.
Для справки структура журнала таблицы (в исходных базах данных MySQL) имеет видследующим образом:
CREATE TABLE `log` (
`code` varchar(9) NOT NULL,
`register` datetime NOT NULL,
`entry` mediumtext NOT NULL,
PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
Я передаю данные из таблицы «log» в двух базах данных в один раздел Kafka, используя следующую конфигурацию, которая работает как задумано.
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",
Япытаясь установить поток KSQL, который создает новый ключ, который представляет собой конкатенацию исходной базы данных (из метаданных, сгенерированных Debezium) и поля кода из таблицы журнала вместе с остальными полями из таблицы.Цель этого состоит в том, чтобы производный ключ был полностью уникальным при отправке в приемник (в настоящее время он подключается к другой базе данных MySQL, которая содержит одну таблицу журналов, содержимое которой должно быть объединенной копией таблиц журналов двух исходных баз данных)
Запрос, который я пытаюсь выполнить:
SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Однако возникает следующая ошибка:
line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
Я не вижу нигде, что говорит о том, что«регистрация» - это какой-то зарезервированный термин.
Кто-нибудь может помочь?В качестве альтернативы можно предложить способ изменения имени поля при преобразовании, учитывая, что я не могу сгладить сообщение, сгенерированное Debezium, поскольку мне нужно иметь возможность получить имя исходной базы данных