Kafka Connect с соединителем JdbcConnectionSource не может создать задачу (соединитель работает, но задача - нет) - PullRequest
4 голосов
/ 22 апреля 2020

Кажется, довольно часто я создаю коннектор Kafka Connect из JdbcConnectionSource на основе запроса, и коннектор успешно создается со статусом «RUNNING», но задача не создается. Просматривая журналы консоли моего контейнера, я не вижу признаков того, что что-то не так, что я могу сказать: ни ошибок, ни предупреждений, ни объяснений того, почему задача не удалась. Я могу заставить работать другие соединители, но иногда это не так.

Как можно получить дополнительную информацию для устранения неполадок, если соединителю не удается создать задачу RUNNING?

Ниже я приведу пример конфигурации моего коннектора.

Я использую Kafka Connect 5.4.1-ccs.

Конфигурация коннектора (это база данных Oracle за JDB C):

{
    "name": "FiscalYear",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=myhost.example.com)(PORT=1521))(LOAD_BALANCE=OFF)(FAILOVER=OFF)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=MY_DB_PRI)(UR=A)))",
        "connection.user":"myuser",
        "connection.password":"mypass",
        "mode": "timestamp",
        "timestamp.column.name": "MAINT_TS",
        "topic.prefix": "MyTeam.MyTopicName",
        "poll.interval.ms": 5000,
        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "numeric.mapping": "best_fit",

        "_comment": "The query is wrapped in `select * from ()` so that JdbcSourceConnector can automatically append a WHERE clause.",
        "query": "SELECT * FROM (SELECT fy_nbr, min(fy_strt_dt) fy_strt_dt, max(fy_end_dt) fy_end_dt FROM myuser.fsc_dt fd WHERE fd.fy_nbr >= 2020 and fd.fy_nbr < 2022 group by fy_nbr)/* outer query must have no WHERE clause so that the source connector can append one of its own */"
    }
}  

И Dockerfile, который создает моего работника:

FROM confluentinc/cp-kafka-connect:latest

# each "CONNECT_" env var refers to a Kafka Connect setting; e.g. CONNECT_REST_PORT refers to setting rest.port
#  see also https://docs.confluent.io/current/connect/references/allconfigs.html

ENV CONNECT_BOOTSTRAP_SERVERS="d.mybroker.example.com:9092"
ENV CONNECT_REST_PORT="8083"
ENV CONNECT_GROUP_ID="MyGroup2" 

ENV CONNECT_CONFIG_STORAGE_TOPIC="MyTeam.ConnectorConfig" 
ENV CONNECT_OFFSET_STORAGE_TOPIC="MyTeam.ConnectorOffsets" 
ENV CONNECT_STATUS_STORAGE_TOPIC="MyTeam.ConnectorStatus" 

ENV CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
ENV CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 

ENV CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"  
ENV CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 

ENV CONNECT_LOG4J_ROOT_LOGLEVEL="INFO"

COPY ojdbcDrivers /usr/share/java/kafka-connect-jdbc

(я также установил объявленную REST переменную среды имени хоста через мою диаграмму Хелма, так что поэтому это не так) установите выше.)

После его вращения я создаю соединитель, а затем получаю его из REST "/ status":

{"name":"FiscalYear","connector":{"state":"RUNNING","worker_id":"10.1.2.3:8083"},"tasks":[],"type":"source"}

1 Ответ

2 голосов
/ 22 апреля 2020

Как можно получить дополнительную информацию для устранения неполадок, когда соединителю не удается создать задачу RUNNING?

Я бы увеличил уровень ведения журнала на вашем работнике Kafka Connect. Поскольку вы используете Apache Kafka 2.4, вы можете делать это динамически, что весьма полезно. Выполните этот вызов API REST для вашего сотрудника Kafka Connect:

curl -X PUT http://localhost:8083/admin/loggers/io.confluent \
     -H "Content-Type:application/json" -d '{"level": "TRACE"}'

. При этом все сообщения для любого соединителя Confluent будут увеличены до TRACE. Он также возвращает список отдельных регистраторов, из которых вы можете выбрать различные регистраторы и повернуть их указанный c уровень записи вверх или вниз по мере необходимости. Например:

curl -X PUT http://localhost:8083/admin/loggers/io.confluent.connect.jdbc.dialect.DatabaseDialects \
     -H "Content-Type:application/json" -d '{"level": "INFO"}'

Ссылка: https://rmoff.net/2020/01/16/changing-the-logging-level-for-kafka-connect-dynamically/

...