Пока мы пытаемся использовать Kafka Connect для HDFS Sink на основе статьи в блоге Confluent: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html, мы встречаемся с
java.lang.NullPointerException
Следующая информация прилагается к документу:
- etc / connect-distributed.properties,
- Ошибка трассировки стека,
- Соединитель REST API.
Мы использовали connect-standalone.properties, connect-distrib.propertied, а также quickstart-hdfs.properties. Вся помощь будет высоко оценена.
Спасибо
Мы пробуем различные источники и приемники с множеством разных источников данных и приемников
kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic test_hdfs --from-beginning
{"f1":"value1"}
{"f1":"value2"}
{"f1":"value3"}
Connector-configurations
-------------------------
curl -i -X POST -H "Content-Type:application/json" http://localhost:8083/connectors/ -d'{
"name": "hdfs-sink-connect-6",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schema.registry.url": "http://localhost:8081",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"topics": "test_hdfs",
"hdfs.url": "hdfs://localhost:9000",
"flush.size": "3",
"key.ignore":"true"
}
}'
----------------------------------------------------------------------------------------------------------------------------------------------
connect-distributed.properties
----------------------------
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://localhost:9000
flush.size=3
hadoop.conf.dir=/usr/local/hadoop/hadoop-2.8.5/etc/hadoop
rotate.interval.ms=100000
format.class=io.confluent.connect.hdfs.avro.AvroFormat
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
key.converter.schema.registry.url=http://localhost:8081
-----------------------------------------------------------------------------------------------------------------------------------------------
ErrorStack Trace
----------------
"java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:67) at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:646) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:443) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:316) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"