Kafka Connect не может преобразовать пользовательский разделитель приемника в интерфейс Partitioner - PullRequest
0 голосов
/ 13 февраля 2020

Мне нужно создать пользовательский разделитель для kafka connect S3-плагин раковины . Я расширил HourlyPartitioner в пользовательском классе, используя kotlin:

class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}

, и соответственно изменил конфигурацию моего соединителя для использования пользовательского класса:

"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",

Затем я создал наш jar (мы используем shadow ) и включил его в пользовательское docker изображение, основанное на изображении kafka connect (версия изображения совпадает с зависимостями, которые мы используем в проекте). ):

FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar

FROM confluentinc/cp-kafka-connect:5.3.2

COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar

Когда запускается соединитель, я получаю эту ошибку:

ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner

Для двойной проверки я создал файл java, который пытается создать экземпляр класса, и это не так. не выдавайте ошибку:

import io.confluent.connect.storage.partitioner.Partitioner;

public class InstantiateTest {
    public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<? extends Partitioner<?>> partitionerClass =
                (Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");

        Partitioner<?> partitioner = partitionerClass.newInstance();
    }
}

1 Ответ

1 голос
/ 13 февраля 2020

Рассматривая руководство kafka connect , оно говорит:

Плагин Kafka Connect - это просто набор JAR-файлов, где Kafka Connect может найти реализацию одного или нескольких коннекторов. , преобразовывает и / или преобразователи. Kafka Connect изолирует каждый плагин друг от друга, так что библиотеки в одном плагине не подвержены влиянию библиотек в других плагинах. Это очень важно при смешивании и сопоставлении соединителей от нескольких провайдеров.

Это означает, что, поскольку я использую соединитель приемника S3, я должен поместить свой jar с пользовательским разделителем в каталог Плагин S3.

Перемещение файла jar на /usr/share/java/kafka-connect-s3 решило проблему

В комментариях я упомянул, что мой jar также включает в себя собственную стратегию имени субъекта, которую мы используем в основной кафке -connect config (переменные env), в этом случае jar должен находиться в папке /usr/share/java/kafka

Update : как упоминалось cricket_007 , лучше поставить пользовательский разделитель jar в папку /usr/share/java/kafka-connect-storage-common, где находятся все остальные разделители

...