Я пытаюсь использовать Confluent InfluxDB Sink Connector , чтобы получить данные из раздела kafka в мою InfluxDB.
Во-первых, я передаю данные в тему кафки из файла журнала с помощью nifi, и это хорошо работает. В теме Кафка получить данные, как показано ниже:
{
"topic": "testDB5",
"key": null,
"value": {
"timestamp": "2019-03-20 01:24:29,461",
"measurement": "INFO",
"thread": "NiFi Web Server-795",
"class": "org.apache.nifi.web.filter.RequestLogger",
"message": "Attempting request for (anonymous)
},
"partition": 0,
"offset": 0
}
Затем я создаю соединитель приемника InfluxDB через интерфейс Kafka Connect и получаю следующее исключение:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
Но если я вручную введу данные в другую тему testDB1 с помощью
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'
Это работает, мой InfxDB может получить данные.
Вот конфигурация подключения:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5
конфигурация подключения темы testDB1 такая же, кроме имени темы.
Есть ли проблемы в nifi? Но он может хорошо передавать данные в тему.