kafka connect - Как отфильтровать метаданные схемы из полезной нагрузки - PullRequest
0 голосов
/ 10 апреля 2019

Я пытаюсь удалить схему из полезной нагрузки, и вот конфигурации

connector.properties

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/employee_db?user=root&password=root
table.whitelist=testemp
mode=incrementing
incrementing.column.name=employee_id
topic.prefix=test-mysql-jdbc-

и ниже мой рабочий.properties

bootstrap.servers=localhost:9092


key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000
plugin.path=C:\Users\name\Desktop\kafka\libs

выход:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"employee_id"},{"type":"string","optional":false,"field":"first_name"}],"optional":false,"name":"testemp"},"payload":{"employee_id":2,"first_name":"test"}}

исключение:

{"payload":{"employee_id":2,"first_name":"test"}}

Iпопытался отключить value.converter.schemas.enable = false в работнике, как предложено в здесь по-прежнему не действует

Я что-то упустил?

1 Ответ

1 голос
/ 10 апреля 2019

Есть два варианта исправить это:

  • Удалить свойство value.converter из конфигурации вашего коннектора (вы используете тот же value.converter)
  • Установить value.converter.schemas.enable=false в вашем коннектореконфигурация.

Схема добавлена ​​в сообщение, потому что вы перезаписали конвертер значений и не отключили схему (по умолчанию для JsonConverter схема включена).С точки зрения Kafka Connect вы использовали совершенно новый конвертер (он не будет использовать свойства из глобальной конфигурации)

Если вы отключите схему, ваше сообщение будет выглядеть следующим образом:

{
    "employee_id": 2,
    "first_name":"test"
}
...