Автономный коннектор приемника Kafka JDBC, не вставляющий данные в базу данных mysql (с помощью confluent-community-2.12)
Я установил confluent-community-2.12 на centos7 и запустил автономный коннектор приемника jdbc для приема записей изудаленный кафка кластер.Я могу использовать записи с помощью простого Java-потребителя и просматривать данные записи, однако, когда я запускаю соединитель приемника, он загружается и нормально подключается к удаленному кластеру, но просто останавливается на следующем журнале. INFO
[2019-05-01 11:10:21,619] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-05-01 11:10:21,620] INFO WorkerSinkTask{id=sink-mysql-standalone-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
Evenпосле запуска коннектора, если я выдаю какие-либо данные в кластер, в коннекторе приемника ничего не происходит.
Я попытался получить результат состояния соединителя, используя:
curl localhost:8083/connectors/sink-mysql-standalone/status
результат выглядит следующим образом:
{"name":"sink-mysql-standalone","connector":{"state":"RUNNING","worker_id":"10.3.0.40:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.3.0.40:8083"}],"type":"sink"}
sink.properties:
name=sink-mysql-standalone
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=my-topic
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# JDBCSink connector specific configuration
connection.url=jdbc:mysql://10.3.0.37:3306/mydb?zeroDateTimeBehavior=convertToNull&useUnicode=yes&characterEncoding=UTF-8
connection.user=myuser
connection.password=mypassword
insert.mode=upsert
table.name.format = tbl_kaf_${topic}
pk.mode=kafka
pk.fields=__connect_topic,__connect_partition,__connect_offset
fields.whitelist=messageId
auto.create=true
auto.evolve=true
Продюсер выпускает следующую запись:
Key: id_5dfbdffe-ffbc-4fbf-925c-a14734304fa8, Value: {
"type" : "text",
"messageId" : "ID:activemq-XXXXXX-XXXXXXXXXXXXX-X:XX:1:2:2",
"correlationId" : "",
"destination" : {
"type" : "queue",
"name" : "qToKafka"
},
"replyTo" : null,
"priority" : 0,
"expiration" : 0,
"timestamp" : 1556549819473,
"redelivered" : false,
"properties" : {},
"payloadText" : "<Some XML Data>",
"payloadMap" : null,
"payloadBytes" : null
}
Пожалуйста, дайте мне знать, что мне здесь не хватает.Спасибо