Пользовательский соединитель Kafka: ошибка соединителя, но задачи все еще выполняются - PullRequest
0 голосов
/ 17 апреля 2020

Я создаю пользовательский соединитель источника Kafka. Этот разъем работает и работает. Я использую оставшийся API для обновления параметров конфигурации соединителя (например, конечная точка источника и ключ подключения). Я использую приведенный ниже API покоя

PUT /connectors/{name}/config

Из-за некоторой проблемы (например, неправильный ключ) API не завершается успешно, а коннектор завершается ошибкой

Выход API как ниже. Обратите внимание, что соединитель находится в состоянии сбоя, но задачи все еще выполняются. На самом деле они все еще «подключены» к старой конечной точке и ключу соединения вместо того, чтобы корректно завершать задачу.

{
    "name": "CustomSrcConnector_V1",
    "connector": {
        "state": "FAILED",
        "worker_id": "XXX.XX.XX.XX:8083",
        "trace": "org.apache.kafka.connect.errors.ConnectException: Exception happened at Create : Exception in getPartitions()\n\tat com.xxx.yyy.kafka.connect.eventhub.CustomSourceConnector.start(CustomSourceConnector.java:102)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)\n\tat org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)\n\tat org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:242)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:908)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:345)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:317)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:219)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "XXX.XX.XX.XX:8083"
        },
        {
            "id": 1,
            "state": "RUNNING",
            "worker_id": "XXX.XX.XX.XX:8083"
        }
    ],
    "type": "source"
}

Я понимаю, что задачи и соединители, как правило, разъединены. Однако я хотел бы знать, есть ли способ корректно завершить выполнение задач, если во время обновления конфигурации Conenctor произошла ошибка соединителя.

...