Kafka JDBC Sink Connector дает исключение пустого указателя для сообщения со схемой, имеющей необязательное поле здесь «parentId». Я что-то пропустил? Я использую из коробки JSONConverter и JDBC Sink Connector
Сообщение на тему Кафки:
{
"schema":{
"type":"struct",
"fields":[
{
"field":"id",
"type":"string"
},
{
"field":"type",
"type":"string"
},
{
"field":"eventId",
"type":"string"
},
{
"field":"parentId",
"type":"string",
"optional":true
},
{
"field":"created",
"type":"int64",
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1
}
]
},
"payload":{
"id":"asset-1",
"type":"parcel",
"eventId":"evt-1",
"created":1501834166000
}
}
И Соединитель с этими свойствами
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id
Но JNBC Sink Connector не работает для необязательного поля parentId как
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more