Как добавить HDFS-коннектор в Kafka Connect API? - PullRequest
0 голосов
/ 14 января 2020

Я новичок в использовании функций Kafka Connect, и у меня возникают трудности с настройкой Kafka и HDFS с помощью Kafka Connect.

Я следую учебному руководству с веб-сайта Debezium, где я могу протестировать новые события и посмотрите, как работает система. Однажды в руководстве они объяснили, как мы можем создать соединитель между MySql и Kafka, я попытался сделать то же самое, но для HDFS.

Я провел онлайн-исследование и запустил следующую команду:

*curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":1,
"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders",
"hdfs.url":"hdfs://172.18.0.2:9870",
"flush.size":3,
"logs.dir":"logs",
"topics.dir":"kafka",
"format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner",
"partition.field.name":"day"}}'*

В этой команде я добавил topi c, который был автоматически сгенерирован Kafka, URL, который я пытался использовать IP-адрес из namenode контейнера (который я не уверен, если это правильно). В общем, я тестирую, но конечная цель - получить каждое событие в HDFS.

{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.hdfs.HdfsSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.gcs.GcsSinkConnector, name='io.confluent.connect.gcs.GcsSinkConnector', version='5.0.3', encodedVersion=5.0.3, type=sink, typeName='sink', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.kafka.connect.datagen.DatagenConnector, name='io.confluent.kafka.connect.datagen.DatagenConnector', version='null', encodedVersion=null, type=source, typeName='source', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=connector, typeName='connector', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}"}

Это ошибка терминала, полагаю, что плагин для HDFS не установлен должным образом (я следовал за многими примерами онлайн, но все еще не уверен, правильно ли установлен).

Я не уверен, действительно ли нужен этот плагин от Confluent?

Я не знаю, установлен ли HDFS из docker, это тоже хорошая идея?

Надеюсь, вы поделитесь своими знаниями по этому вопросу, спасибо заранее.

Ссылка на учебник: https://debezium.io/documentation/reference/1.0/tutorial.html

Ответы [ 3 ]

0 голосов
/ 15 января 2020

Разъем HDFS2 Sink Connector устарел и удален из установки Confluent Platform.

Вы все еще можете найти его и установить из Confluent Hub, и я бы порекомендовал использовать официальный сайт Apache Kafka, чтобы узнать о ядре Kafka Connect чаще, чем где-либо еще

0 голосов
/ 17 января 2020

Проверьте эту ссылку на промежуточных документах Confluent для HDFS 2 Sink Connector: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html

Он охватывает почти все, а также поможет вам обойтись, если вы используете Confluent Платформа.

Образ docker обеспечивает надежный способ работы с HDFS. Используйте это изображение: https://hub.docker.com/r/sequenceiq/hadoop-docker/

0 голосов
/ 15 января 2020

Добро пожаловать в StackOverflow!

Существует проблема с установкой плагина. Поэтому, прежде всего, проверьте это с помощью интерфейса REST Kafka Connect (подробности здесь ). И тогда вы можете установить разъем вручную .

...