Kafka JDBC Sink Connector не использует события от удаленного кластера - PullRequest
1 голос
/ 01 мая 2019

Автономный коннектор приемника Kafka JDBC, не вставляющий данные в базу данных mysql (с помощью confluent-community-2.12)

Я установил confluent-community-2.12 на centos7 и запустил автономный коннектор приемника jdbc для приема записей изудаленный кафка кластер.Я могу использовать записи с помощью простого Java-потребителя и просматривать данные записи, однако, когда я запускаю соединитель приемника, он загружается и нормально подключается к удаленному кластеру, но просто останавливается на следующем журнале. INFO

[2019-05-01 11:10:21,619] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)

[2019-05-01 11:10:21,620] INFO WorkerSinkTask{id=sink-mysql-standalone-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301) 

Evenпосле запуска коннектора, если я выдаю какие-либо данные в кластер, в коннекторе приемника ничего не происходит.

Я попытался получить результат состояния соединителя, используя:

curl localhost:8083/connectors/sink-mysql-standalone/status 

результат выглядит следующим образом:

{"name":"sink-mysql-standalone","connector":{"state":"RUNNING","worker_id":"10.3.0.40:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.3.0.40:8083"}],"type":"sink"} 

sink.properties:

name=sink-mysql-standalone
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my-topic

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# JDBCSink connector specific configuration
connection.url=jdbc:mysql://10.3.0.37:3306/mydb?zeroDateTimeBehavior=convertToNull&useUnicode=yes&characterEncoding=UTF-8
connection.user=myuser
connection.password=mypassword
insert.mode=upsert
table.name.format = tbl_kaf_${topic}
pk.mode=kafka
pk.fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=messageId
auto.create=true
auto.evolve=true

Продюсер выпускает следующую запись:


Key: id_5dfbdffe-ffbc-4fbf-925c-a14734304fa8, Value: {
 "type" : "text",
 "messageId" : "ID:activemq-XXXXXX-XXXXXXXXXXXXX-X:XX:1:2:2",
 "correlationId" : "",
 "destination" : {
  "type" : "queue",
  "name" : "qToKafka"
 },
 "replyTo" : null,
 "priority" : 0,
 "expiration" : 0,
 "timestamp" : 1556549819473,
 "redelivered" : false,
 "properties" : {},
 "payloadText" : "<Some XML Data>",
 "payloadMap" : null,
 "payloadBytes" : null
}

Пожалуйста, дайте мне знать, что мне здесь не хватает.Спасибо

...