Ошибка коннектора Debezium MongoDB: org. apache .kafka.connect.errors.ConnectException: Превышен допуск в обработчике ошибок - PullRequest
1 голос
/ 29 апреля 2020

Я пытаюсь развернуть новый коннектор Debezium для MongoDB с помощью Transforms. Конфигурация выглядит следующим образом:

{"name": "mongo_source_connector_autostate",
    "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 
    "tasks.max":1,
    "initial.sync.max.threads":4,
    "mongodb.hosts": "rs0/FE0VMC1980:27017", 
    "mongodb.name": "mongo", 
    "collection.whitelist": "DASMongoDB.*_AutoState",
    "transforms": "unwrap",
    "transforms.unwrap.type" : "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "transforms.sanitize.field.names" : true
    }}

Однако при сбое соединителя возникает следующая ошибка:

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.avro.SchemaParseException: Illegal initial character: 10019_AutoState
        at org.apache.avro.Schema.validateName(Schema.java:1528)
        at org.apache.avro.Schema.access$400(Schema.java:87)
        at org.apache.avro.Schema$Name.<init>(Schema.java:675)
        at org.apache.avro.Schema.createRecord(Schema.java:212)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:893)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:732)
        at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:726)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
        at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more

Я запустил соединитель в распределенном режиме со следующей конфигурацией:

...
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
...

Примечание: у меня есть другой разъем без каких-либо преобразований. Он работает просто отлично.

Я хотел бы получить помощь по этому поводу. Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 30 апреля 2020

Одно из ваших полей нарушает правила именования Avro. В вашем случае это выглядит так:

Часть имени полного имени, имена полей записи и символы перечисления должны:

  • начинаться с [A-Za-z_]

Но 10019_AutoState нарушает правило, поскольку оно начинается с числовых значений. Вы можете изменить его на что-то вроде AutoState10019


Вы можете просмотреть полный список со всеми ограничениями именования полей записи здесь .

1 голос
/ 30 апреля 2020

Какая версия Debezium? Если это проблема с 1.1 / 1.2, пожалуйста, поднимите вопрос Jira. Имя схемы необходимо очистить, и мне кажется, что в этом случае ошибка происходит из имени коллекции 10019_AutoState, а имя схемы не очищено, так как оно не должно начинаться с номера.

...