Является ли «регистрация» зарезервированным ключевым словом в Ksql, и если да, то как я могу выбрать поле с таким именем - PullRequest
1 голос
/ 14 мая 2019

Я нахожусь в процессе изучения платформы Confluent (Kafka, Ksql и т. Д.).Я передаю данные в тему Kafka, используя Debezium с Kafka Connect.Одно из полей в моей таблице базы данных «log» называется «register», которое является отметкой времени добавления записи.

Для справки структура журнала таблицы (в исходных базах данных MySQL) имеет видследующим образом:

CREATE TABLE `log` (
  `code` varchar(9) NOT NULL,
  `register` datetime NOT NULL,
  `entry` mediumtext NOT NULL,
  PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

Я передаю данные из таблицы «log» в двух базах данных в один раздел Kafka, используя следующую конфигурацию, которая работает как задумано.

"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",

Япытаясь установить поток KSQL, который создает новый ключ, который представляет собой конкатенацию исходной базы данных (из метаданных, сгенерированных Debezium) и поля кода из таблицы журнала вместе с остальными полями из таблицы.Цель этого состоит в том, чтобы производный ключ был полностью уникальным при отправке в приемник (в настоящее время он подключается к другой базе данных MySQL, которая содержит одну таблицу журналов, содержимое которой должно быть объединенной копией таблиц журналов двух исходных баз данных)

Запрос, который я пытаюсь выполнить:

SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;

Однако возникает следующая ошибка:

line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
        'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
        'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
        'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
        'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

Я не вижу нигде, что говорит о том, что«регистрация» - это какой-то зарезервированный термин.

Кто-нибудь может помочь?В качестве альтернативы можно предложить способ изменения имени поля при преобразовании, учитывая, что я не могу сгладить сообщение, сгенерированное Debezium, поскольку мне нужно иметь возможность получить имя исходной базы данных

1 Ответ

2 голосов
/ 15 мая 2019
  1. Да REGISTER - зарезервированное слово, вы должны избегать его в своем DDL.Вы можете получить к нему доступ, цитируя его, стоит попробовать.

  2. Для отбрасывания полей существует преобразование одного сообщения, но оно не работает с вложенными данными.Вы можете попробовать SMT UnwrapFromEnvelope в сочетании с одним для переименования поля.Я не пробовал этот конфиг, но что-то вроде

    "transforms": "unwrap,renameField",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameField.renames": "register:notareservedword",
    
...