Упаковка пользовательского плагина Java `partitioner.class` для Kafka Connect в Confluent 4.1 + Kafka 1.1? - PullRequest
0 голосов
/ 01 мая 2018

Я успешно использую простой пользовательский класс Partitioner, написанный на Java, для приемника Kafka Connect на Confluent 3.2.x (Kafka 0.10.x). Я хочу перейти на Confluent 4.1 (Kafka 1.1) и у меня возникают ошибки.

Механизм загрузки плагинов Kafka Connect, похоже, был изменен в CP 3.3.0. Раньше была только опция CLASSPATH, но с CP 3.3.0+ есть более новый и рекомендуемый механизм plugin.path.

Если я пытаюсь продолжать использовать устаревший механизм плагинов CLASSPATH, когда я пытаюсь использовать мой плагин, я получаю:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

Это внутренний класс CP. С более старым CP 3.2.x, который был доступен на classpath, однако с новыми усилиями по изоляции classpath в CP> = 3.3.0, я предполагаю, что это должно быть предоставлено вместе с плагином.

Полагаю, разумно перейти на более новый рекомендуемый механизм plugin.path. Я удаляю запись CLASSPATH. По умолчанию /etc/kafka/connect-distributed.properties я вижу plugin.path=/usr/share/java, поэтому я устанавливаю свой плагин .jar в /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar. Я также попытался добавить и не добавлять файлы .jar зависимостей.

Кажется, что мой плагин загружается при запуске службы Kafka Connect:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

Когда я делаю:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

Я получаю:

{"error_code":500,"message":null}%             

Я вижу в журнале kafka connect systemd:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

Непонятно, что происходит неправильно или почему мой класс разделителя загружается неправильно.

К вашему сведению, я перестроил свой плагин Java с зависимостями CP 4.1 + Kafka 1.1 и сделал небольшие обновления, чтобы соответствовать изменениям API, таким как добавление реализации для getSchemaGeneratorClass в мой класс разделителя.

1 Ответ

0 голосов
/ 04 мая 2018

Пользовательские классы Kafka Connect Partitioner не будут работать через старый механизм CLASSPATH, и они не будут работать как плагины с более новым механизмом изолированных плагинов Kafka 0.11.0+.

Единственное рабочее решение - скопировать ваш пользовательский файл .jar с вашим пользовательским классом Kafka Connect Partitioner в директорию плагинов kafka-connect-storage-common по адресу /usr/share/java/kafka-connect-storage-common/. Пользовательские классы плагинов Kafka Connect Partitioner должны существовать в том же каталоге, чтобы они находились в одном изолированном загрузчике классов.

К вашему сведению, вы можете видеть, что механизм изолированных плагинов Kafka 0.11.0+ будет загружать только подклассы четырех конкретных классов Java, которые не охватывают разделители Kafka Connect, здесь:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279

Спасибо cricket_007 за то, что порекомендовали это точное решение: размещение пользовательских файлов .jar разделителя Kafka Connect в каталоге /share/java/kafka-storage-common. Я на собственном опыте понял, почему это нужно делать и почему альтернативы не работают.

...