Ошибка соединителя kafka_Mongodb syn c ОШИБКА WorkerSinkTask {id = mon go -sink-0} Задача уничтожается и не будет восстановлена, пока не будет перезапущена вручную - PullRequest
0 голосов
/ 11 апреля 2020

Запуск syn c jobs

bash-4.4# nohup ./bin/connect-standalone.sh ./config/connect-standalone-mongo.properties ./config/connect-mongo-sink.properties

при получении следующей ошибки ..

ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:753)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:374)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more

INFO StandaloneConfig значения:

   access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    bootstrap.servers = [172.18.0.7:9092]
    client.dns.lookup = default
    config.providers = []
    connector.client.config.override.policy = None
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter

Пример файла конфигурации как загружено ... INFO AdminClientConfig значения:

bootstrap.servers = [172.18.0.7:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5

[2020-04-11 11: 51: 49,331] INFO MongoSinkTopicConfig значения:

   change.data.capture.handler =
    collection = changedata
    database = z**
    delete.on.null.values = false
    document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
    field.renamer.mapping = []
    field.renamer.regexp = []
    key.projection.list =
    key.projection.type = none
    max.batch.size = 0
    max.num.retries = 3
    post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
    rate.limiting.every.n = 0
    rate.limiting.timeout = 0
    retries.defer.timeout = 5000
    topic = changedata

Что мне изменить или как проверить раковину kafka_mongodb ..? Я также пытаюсь внутри контейнера ... поговорить с внешним миром Монгодб. Kakfa и Zookeeper работают внутри контейнера, у меня есть необходимые плагины и файл конфигурации в правильном месте.

1 Ответ

0 голосов
/ 17 апреля 2020

Убедитесь, что у вас правильный IP-адрес Mongodb, а порты открыты, проверьте брандмауэр и другие параметры безопасности, а также убедитесь, что mongodb прослушивает 0.0.0.0 вместо 127.0.0.1

...