JDBC Sink Connector: Как отобразить поля из сообщения Кафки в столбец таблицы базы данных - PullRequest
0 голосов
/ 11 октября 2019

Я использую Confluent JDBC Sink Connector для записи всех изменений из темы Kafka в базу данных. Мое сообщение в формате JSON без какой-либо прикрепленной схемы. Например:

{ "key1": "value1", "key2": 100}

Вот моя конфигурация:

name=sink-mysql-1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=send_1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
database.hostname=jdbc:mysql://0.0.0.0:3306/test_tbl
database.user=root
database.password=root
insert.mode=upsert
pk.mode=kafka
auto.create=true
auto.evolve=true

Проблема, с которой я столкнулся: из-за устаревшей системы я не могу изменить формат сообщения. Так что мои сообщения - это объект JSON без информации о схеме. Поддерживает ли библиотека отображение полей? Например, отображение из поля A в поле B в базе данных.

Спасибо

1 Ответ

0 голосов
/ 11 октября 2019

У вас есть , чтобы иметь объявленную схему для ваших данных для использования JDBC Sink. На практике это означает, что вам необходимо:

Если у вас нет этой опции для того, когда данные создаются в Kafka, вы можете создать этап обработки потока, который применяет схему. Вы можете сделать это с помощью Kafka Streams или с помощью KSQL. Результатом этого является тема Kafka, которая затем используется в качестве источника для Kafka Connect. Примером этого в KSQL может быть:

-- Declare the schema of the source JSON topic
CREATE STREAM send_1_src (KEY1 VARCHAR, 
                          KEY2 INT) 
  WITH (KAFKA_TOPIC='send_1', 
        VALUE_FORMAT='JSON');

-- Run a continuous query populating the target topic `SEND_1_AVRO` 
-- with the data from `send_1` reserialised into Avro
CREATE STREAM SEND_1_AVRO 
  WITH (VALUE_FORMAT='AVRO') AS 
  SELECT * 
    FROM send_1_src;

  • Чтобы узнать больше о KSQL см. Здесь .
  • Вы можете найти несколько замечательных примеров шаблонов потоковой обработки с необработанными потребителями Kafka против Kafka Streams против KSQL в Kafka Tutorials здесь .
...