Kafka Connect: сбой нескольких исходных соединителей JDBC DB2 - PullRequest
0 голосов
/ 17 декабря 2018

Я пытаюсь использовать Kafka Connect в локальном контейнере Docker (используя официальный образ Confluent) для передачи данных DB2 в кластер Kafka в Openshift (в AWS).Я использую Confluent JDBC-коннектор с моим JDBC-Jar DB2.У меня разные конфиги коннекторов, так как я использую SMT с «transforms.createKey» (для создания моего ключа), а столбцы ключей в моих таблицах имеют разные имена.

Вот мои шаги:

  • создание тем для Kafka Connect для конфигурации, смещения и состояния
  • запуск / создание контейнера Kafka Connect (с помощью переменных env, см. Ниже)
  • создание первого JDBC-коннектора с помощью пост-вызова в мой контейнер Connect(см. ниже)

Пока все работает хорошо, и я вижу, как мои данные отправляются в кластер.Однако, как только я добавляю второй JDBC-соединитель посредством пост-вызова, первый соединитель прекращает отправку данных в кластер, а второй запускается и продолжает загружать и отправлять данные.Некоторое время кажется, что оба соединителя передают данные в кластер, но я предполагаю, что это могут быть данные из соединителя 1, которые все еще сброшены.Проблема в том, что а) даже журналы трассировки не показывают значимую ошибку (по крайней мере, для меня) и б) отображаемые ошибки различаются между попытками (я всегда удалял все темы и контейнер).

Я предполагаю, что это не ошибка, а скорее комбинация настроек, которые должны быть установлены соответствующим образом, и / или мне не хватает понимания некоторых основных функций ядра Kafka Connect.Я уже пытался добавлять и изменять различные конфиги, но, к сожалению, пока ничего не получилось.Я сделал много попыток, но не повезло.Я приложил журналы двух моих последних попыток, а также конфиги.

У кого-нибудь есть идея, какую конфигурацию я могу адаптировать или что посмотреть, чтобы это исправить?Любая помощь приветствуется - спасибо!


Kafka: 2.0.0
Docker image: confluentinc/cp-kafka-connect:5.0.0
DB2: 10.5
JDBC Jar: db2jcc4.jar with version 4.19.76

Журналы 1-я попытка:

[2018-12-17 13:09:15,683] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2018-12-17 13:09:15,684] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:409)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-12-17 13:09:15,686] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-12-17 13:09:15,686] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2018-12-17 13:09:20,682] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 13:09:20,682] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Журналы 2-я попытка:

[2018-12-17 14:01:31,658] INFO Stopping task db2-jdbc-source-0 (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,689] INFO Stopped connector db2-jdbc-source (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} flushing 20450 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:36,733] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:36,733] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

скриншотвходящих сообщений в секунду в кластере Kafka

Переменные env Kafka Connect Docker:

-e CONNECT_BOOTSTRAP_SERVERS=my_kafka_cluster:443 \
  -e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="my_kafka_cluster:443" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect" \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID="kafka-connect-group" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_CONFIG_STORAGE_TOPIC="kafka-connect-config" \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_OFFSET_STORAGE_TOPIC="kafka-connect-offset" \
  -e CONNECT_OFFSET_FLUSH_INTERVAL_MS=15000 \
  -e CONNECT_OFFSET_FLUSH_TIMEOUT_MS=60000 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_STATUS_STORAGE_TOPIC="kafka-connect-status" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -e CONNECT_PRODUCER_BUFFER_MEMORY="8388608" \
  -e CONNECT_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
  -e HOSTNAME=kafka-connect \

Соединители JDBC (изменяются только таблицы и ключевые столбцы):

{
    "name": "db2-jdbc-source",
    "config": 
    {
        "mode":"timestamp",
        "debug":"true",
        "batch.max.rows":"50",
        "poll.interval.ms":"10000",
        "timestamp.delay.interval.ms":"60000",
        "timestamp.column.name":"IBMSNAP_LOGMARKER",
        "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector" ,
        "connection.url":"jdbc:db2://myip:myport/mydb:currentSchema=myschema;",
        "connection.password":"mypw",
        "connection.user":"myuser",
        "connection.backoff.ms":"60000",
        "dialect.name": "Db2DatabaseDialect",
        "table.types": "TABLE",
        "table.poll.interval.ms":"60000",
        "table.whitelist":"MYTABLE1",
        "tasks.max":"1",
        "topic.prefix":"db2_",
        "key.converter":"io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://url_to_schemaregistry",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://url_to_schemaregistry",
        "transforms":"createKey",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"MYKEY1"
    }
}

1 Ответ

0 голосов
/ 18 декабря 2018

Я в конце концов выяснил проблему: я использую соединитель JDBC в режиме отметки времени, а не отметку времени + увеличение, потому что я не могу (всегда) указывать увеличивающийся столбец.Я знал, что это может привести к проблеме, что Connect не может знать, какие записи уже были прочитаны, когда есть несколько с одной и той же отметкой времени.

Большая часть моих строк данных имеет одинаковую метку времени.Когда я добавил второй соединитель, текущая временная метка первого соединителя была сохранена, и Connect начал перебалансировку и, следовательно, потерял информацию о том, какие строки для этой отметки уже прочитаны.Когда коннекторы были в рабочем состоянии и снова работали, первый коннектор продолжал со «следующей меткой времени» и, следовательно, загружал только самые новые строки (которые являются лишь малой частью).

Моя ошибка заключалась в том, что в ситуации, подобнойТаким образом, первый соединитель возобновит работу с предыдущей отметкой времени, а не с «следующей отметкой времени».Для меня было бы более разумно скорее рисковать дубликатами, чем потенциально пропускать данные.

...