Я новичок в использовании функций Kafka Connect, и у меня возникают трудности с настройкой Kafka и HDFS с помощью Kafka Connect.
Я следую учебному руководству с веб-сайта Debezium, где я могу протестировать новые события и посмотрите, как работает система. Однажды в руководстве они объяснили, как мы можем создать соединитель между MySql и Kafka, я попытался сделать то же самое, но для HDFS.
Я провел онлайн-исследование и запустил следующую команду:
*curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":1,
"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders",
"hdfs.url":"hdfs://172.18.0.2:9870",
"flush.size":3,
"logs.dir":"logs",
"topics.dir":"kafka",
"format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner",
"partition.field.name":"day"}}'*
В этой команде я добавил topi c, который был автоматически сгенерирован Kafka, URL, который я пытался использовать IP-адрес из namenode контейнера (который я не уверен, если это правильно). В общем, я тестирую, но конечная цель - получить каждое событие в HDFS.
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.hdfs.HdfsSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.gcs.GcsSinkConnector, name='io.confluent.connect.gcs.GcsSinkConnector', version='5.0.3', encodedVersion=5.0.3, type=sink, typeName='sink', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.kafka.connect.datagen.DatagenConnector, name='io.confluent.kafka.connect.datagen.DatagenConnector', version='null', encodedVersion=null, type=source, typeName='source', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=connector, typeName='connector', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}"}
Это ошибка терминала, полагаю, что плагин для HDFS не установлен должным образом (я следовал за многими примерами онлайн, но все еще не уверен, правильно ли установлен).
Я не уверен, действительно ли нужен этот плагин от Confluent?
Я не знаю, установлен ли HDFS из docker, это тоже хорошая идея?
Надеюсь, вы поделитесь своими знаниями по этому вопросу, спасибо заранее.
Ссылка на учебник: https://debezium.io/documentation/reference/1.0/tutorial.html