NullPointerException при соединении Confluent Kafka и InfluxDB - PullRequest
0 голосов
/ 19 марта 2019

Я пытаюсь использовать Confluent InfluxDB Sink Connector , чтобы получить данные из раздела kafka в мою InfluxDB.

Во-первых, я передаю данные в тему кафки из файла журнала с помощью nifi, и это хорошо работает. В теме Кафка получить данные, как показано ниже:

  {
    "topic": "testDB5",
    "key": null,
    "value": {
      "timestamp": "2019-03-20 01:24:29,461",
      "measurement": "INFO",
      "thread": "NiFi Web Server-795",
      "class": "org.apache.nifi.web.filter.RequestLogger",
      "message": "Attempting request for (anonymous) 
    },
    "partition": 0,
    "offset": 0
  }

Затем я создаю соединитель приемника InfluxDB через интерфейс Kafka Connect и получаю следующее исключение:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

Но если я вручную введу данные в другую тему testDB1 с помощью

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'

Это работает, мой InfxDB может получить данные.

Вот конфигурация подключения:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5

конфигурация подключения темы testDB1 такая же, кроме имени темы.

Есть ли проблемы в nifi? Но он может хорошо передавать данные в тему.

Ответы [ 2 ]

1 голос
/ 19 марта 2019

Когда вы используете Avro с Kafka Connect, десериализатор Avro ожидает, что данные были сериализованы с использованием Сериализатора Avro . Это то, что использует kafak-avro-console-producer, поэтому ваш конвейер работает, когда вы его используете.

Эта статья дает хорошее представление об Avro и реестре схем. См. Также Kafka Connect Deep Dive - Объяснение конвертеров и сериализации .

Я не знаком с Nifi, но, глядя на документацию, кажется, что AvroRecordSetWriter имеет возможность использовать Реестр Confluent Schema . Возможно, вы также захотите установить Schema Write Strategy на Confluent Schema Registry Reference.

Как только вы сможете использовать данные из вашей темы с помощью kafka-avro-console-consumer, вы узнаете, что они правильно сериализованы и будут работать с вашим приемником Kafka Connect.

0 голосов
/ 20 марта 2019

Я нашел причину. Это потому, что в Nifi я использовал PublishKafka_0_10 для публикации данных в теме Кафки, но его версия слишком мала!

Когда я делаю запрос в ksql, он говорит, что

Input record ConsumerRecord(..data..) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, 
or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

Итак, я изменяю его на PublishKafka_1_0 и запускаю заново, и это работает! Мой InfxDB может получить данные. Я потерял дар речи.

И спасибо Робину Моффатту за ответ, он мне очень помог.

...