это очень странная проблема, я маскирую метку времени для String, используя TimestampConverter, однако он выдает исключение NullPointer, когда данные нулевые, я вижу, что код был исправлен и объединен в различных ветвях kafka.
я вижу, что исправление было сделано для этого PR и объединено с (1.0,1.1, ... 2.3) https://github.com/apache/kafka/commit/c05ed1eae466ef8afd8d67022d206d7d9bb24838.
мой код выглядит примерно так, для каждого поля
"transforms": "LOAD_DTToString",
"transforms.LOAD_DTToString.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.LOAD_DTToString.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.LOAD_DTToString.target.type": "string",
"transforms.LOAD_DTToString.field": "LOAD_DT" `
Ссылка: https://docs.confluent.io/current/connect/transforms/timestampconverter.html.
Я вижу эту банку в пути к классам.
kafka_2.11-2.1.0-cp1.jar.
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:292)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
статус подключения
{"name":"prm-act","connector":{"state":"RUNNING","worker_id":"lvappi02210.cloud.bns:8084"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:292)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n","id":0,"worker_id":"lvappi02210.cloud.bns:8084"}],"type":"source"}