Таблицы требуют первичного ключа при создании таблицы с темой kafka - PullRequest
0 голосов
/ 14 июля 2020

У меня есть таблица mysql вот такая: введите описание изображения здесь Я использую коннектор kafka, чтобы добавить эту таблицу в kafka topi c:

ksql> CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH (
    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
    'database.hostname' = 'mysql',
    'database.port' = '3306',
    'database.user' = 'debezium',
    'database.password' = 'dbz',
    'database.server.id' = '42',
    'database.server.name' = 'asgard',
    'table.whitelist' = 'demo.customers',
    'database.history.kafka.bootstrap.servers' = 'kafka:29092',
    'database.history.kafka.topic' = 'dbhistory.demo' ,
    'include.schema.changes' = 'false',
    'transforms'= 'unwrap,extractkey',
    'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field'= 'id',
    'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter'= 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url'= 'http://schema-registry:8081'
    );

, а затем я хочу использовать это для создания таблицы на его основе:

CREATE TABLE  CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

тогда я получаю эту ошибку:

Tables require a PRIMARY KEY. Please define the PRIMARY KEY.
Use a partial schema to define the primary key and still load the value columns from the Schema Registry, for example:
        CREATE TABLE CUSTOMERS (ID INT PRIMARY KEY) WITH (...);

когда я меняю это, как было предложено:

CREATE TABLE CUSTOMERS (id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

Я получаю эту ошибку:

Duplicate column names: `ID`

выполнил поиск, но все еще застрял здесь. что с этим не так и как я могу это решить? Спасибо

1 Ответ

0 голосов
/ 29 июля 2020

Проблема в том, что имя, которое вы назначаете столбцу PK, конфликтует с именем столбца в схеме Avro, загружаемой из реестра схем.

Вы можете назвать свой ключевой столбец как хотите , поскольку имя столбца нигде не сохраняется, поэтому просто назовите его как-нибудь, что не является классом sh, например customer_id.

CREATE TABLE CUSTOMERS (customer_id int primary key) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

Также отмечает, что у вас есть:

'key.converter' = 'org. apache .kafka.connect.storage.StringConverter'

Что, как я полагаю, будет преобразовывать ключ в STRING. Не уверен, что это сделано намеренно. ksqlDB также будет работать с INT PK ...

...