Невозможно создать коннектор kafka с помощью REST API - PullRequest
0 голосов
/ 26 сентября 2019

Я пытаюсь запустить работников Kafka в распределенном режиме.В отличие от автономного режима, мы не можем передать файл свойств соединителя при запуске работника в распределенном режиме.В распределенном режиме рабочие запускаются отдельно, и мы разворачиваем и управляем соединителями на этих работниках, используя REST API

Reference Link - https://docs.confluent.io/current/connect/managing/configuring.html#connect-managing-distributed-mode

Я попытался создать соединитель, передав следующие значения вкоманда curl и выполнила ее

curl -X POST -H "Content-Type: application/json" --data '{"name":"sailpointdb","connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","connection.password " : " abc","connection.url " : "jdbc:mysql://localhost:3306/db","connection.user " : "abc" ,"query" : " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS 
TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A","mode" : " timestamp","timestamp.column.name" : "TASKFAILEDON","topic.prefix" : "dbevents","validate.non.null" : "false" }}' http://localhost:8089/connectors/

Я получаю сообщение об ошибке ниже: curl: (3) URL-адрес использует неверный / недопустимый формат или отсутствует URL-адрес

Пожалуйста, дайте мне знать, что не так с вышеупомянутымcurl, я что-то здесь упускаю

1 Ответ

0 голосов
/ 26 сентября 2019
  1. В вашем JSON есть дополнительная закрывающая фигурная скобка, которая не поможет
  2. Если вы POST переходите на /connectors, вам нужен корень name и configэлементы уровня.Но я рекомендую использовать PUT /config, потому что вы можете перезапустить его, чтобы обновить конфигурацию, если вам нужно

Попробуйте это:

curl -X PUT -H  "Content-Type:application/json" \
      http://localhost:8089/connectors/source-jdbc-sailpointdb-00/config \
      -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.password ": " abc",
        "connection.url ": "jdbc:mysql://localhost:3306/db",
        "connection.user ": "abc",
        "query": " SELECT * FROM (SELECT NAME, FROM_UNIXTIME(completed/1000) AS TASKFAILEDON FROM abc WHERE COMPLETION_STATUS = 'Error') as A",
        "mode": " timestamp",
        "timestamp.column.name": "TASKFAILEDON",
        "topic.prefix": "dbevents",
        "validate.non.null": "false"
    }'
...