Соединитель раковины HDFS Kafka-Connect NullPointerException - PullRequest
1 голос
/ 05 июля 2019

Пока мы пытаемся использовать Kafka Connect для HDFS Sink на основе статьи в блоге Confluent: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html, мы встречаемся с

java.lang.NullPointerException

Следующая информация прилагается к документу:

  1. etc / connect-distributed.properties,
  2. Ошибка трассировки стека,
  3. Соединитель REST API. Мы использовали connect-standalone.properties, connect-distrib.propertied, а также quickstart-hdfs.properties. Вся помощь будет высоко оценена. Спасибо

Мы пробуем различные источники и приемники с множеством разных источников данных и приемников

kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic test_hdfs  --from-beginning
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}

Connector-configurations
-------------------------
curl -i -X POST  -H "Content-Type:application/json" http://localhost:8083/connectors/ -d'{
 "name": "hdfs-sink-connect-6",
  "config": {
   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "key.converter":"io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://localhost:9000",
    "flush.size": "3",
    "key.ignore":"true"
  }
}'

----------------------------------------------------------------------------------------------------------------------------------------------

connect-distributed.properties
----------------------------

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
hadoop.conf.dir=/usr/local/hadoop/hadoop-2.8.5/etc/hadoop
rotate.interval.ms=100000
format.class=io.confluent.connect.hdfs.avro.AvroFormat

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
key.converter.schema.registry.url=http://localhost:8081


-----------------------------------------------------------------------------------------------------------------------------------------------

ErrorStack Trace
----------------

"java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:646) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:443) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:316) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"
...