Kafka Connect игнорирует указанные стратегии - PullRequest
0 голосов
/ 26 апреля 2019

Я хочу опубликовать данные нескольких таблиц в одной и той же теме Kafka, используя приведенную ниже конфигурацию коннектора, но я вижу ниже исключение

Исключение

Причина: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: регистрируемая схема несовместима с более ранней схемой; код ошибки: 409

Соединитель, похоже, игнорирует установленные свойства стратегии субъекта и продолжает использовать старые $ {topic} -key и $ {topic} -значения предметов.

[2019-04-25 22:43:45,590] INFO AvroConverterConfig values: 
    schema.registry.url = [http://schema-registry:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

Конфигурация разъема

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
      "name": "two-in-one-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "xxxxxxx",
        "database.port": "3306",
        "database.user": "xxxxxxx",
        "database.password": "xxxxxxxxx",
        "database.server.id": "18405457",
        "database.server.name": "xxxxxxxxxx",
        "table.whitelist": "customers,phone_book",
        "database.history.kafka.bootstrap.servers": "broker:9092",
        "database.history.kafka.topic": "dbhistory.customer",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"(.*)",
        "transforms.dropPrefix.replacement":"customer",
        "key.converter.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
        "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
      }
    }'

1 Ответ

0 голосов
/ 26 апреля 2019

Попробуйте установить классы стратегии ниже параметров в файле конфигурации соединителя (JSON) вместо " key.converter.key.subject.name.strategy " и " value.converter.value.subject.name.strategy"

"key.subject.name.strategy" "value.subject.name.strategy"

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...