Я успешно использую простой пользовательский класс Partitioner, написанный на Java, для приемника Kafka Connect на Confluent 3.2.x (Kafka 0.10.x). Я хочу перейти на Confluent 4.1 (Kafka 1.1) и у меня возникают ошибки.
Механизм загрузки плагинов Kafka Connect, похоже, был изменен в CP 3.3.0. Раньше была только опция CLASSPATH, но с CP 3.3.0+ есть более новый и рекомендуемый механизм plugin.path
.
Если я пытаюсь продолжать использовать устаревший механизм плагинов CLASSPATH, когда я пытаюсь использовать мой плагин, я получаю:
java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner
Это внутренний класс CP. С более старым CP 3.2.x, который был доступен на classpath, однако с новыми усилиями по изоляции classpath в CP> = 3.3.0, я предполагаю, что это должно быть предоставлено вместе с плагином.
Полагаю, разумно перейти на более новый рекомендуемый механизм plugin.path
. Я удаляю запись CLASSPATH. По умолчанию /etc/kafka/connect-distributed.properties
я вижу plugin.path=/usr/share/java
, поэтому я устанавливаю свой плагин .jar в /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar
. Я также попытался добавить и не добавлять файлы .jar зависимостей.
Кажется, что мой плагин загружается при запуске службы Kafka Connect:
INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
Когда я делаю:
curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config
Я получаю:
{"error_code":500,"message":null}%
Я вижу в журнале kafka connect systemd:
java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
Непонятно, что происходит неправильно или почему мой класс разделителя загружается неправильно.
К вашему сведению, я перестроил свой плагин Java с зависимостями CP 4.1 + Kafka 1.1 и сделал небольшие обновления, чтобы соответствовать изменениям API, таким как добавление реализации для getSchemaGeneratorClass
в мой класс разделителя.