Мне нужно создать пользовательский разделитель для kafka connect S3-плагин раковины . Я расширил HourlyPartitioner в пользовательском классе, используя kotlin:
class RawDumpHourlyPartitioner<T> : HourlyPartitioner<T>() {
...
}
, и соответственно изменил конфигурацию моего соединителя для использования пользовательского класса:
"partitioner.class": "co.myapp.RawDumpHourlyPartitioner",
Затем я создал наш jar (мы используем shadow ) и включил его в пользовательское docker изображение, основанное на изображении kafka connect (версия изображения совпадает с зависимостями, которые мы используем в проекте). ):
FROM gradle:6.0-jdk8 as builder
WORKDIR /app
ADD . .
RUN gradle clean shadowJar
FROM confluentinc/cp-kafka-connect:5.3.2
COPY --from=builder /app/build/libs/kafka-processor-0.1-all.jar /usr/share/java/kafka/kafka-processor.jar
Когда запускается соединитель, я получаю эту ошибку:
ERROR WorkerSinkTask{id=staging-raw-dump-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.ClassCastException: co.myapp.RawDumpHourlyPartitioner cannot be cast to io.confluent.connect.storage.partitioner.Partitioner
Для двойной проверки я создал файл java, который пытается создать экземпляр класса, и это не так. не выдавайте ошибку:
import io.confluent.connect.storage.partitioner.Partitioner;
public class InstantiateTest {
public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
Class<? extends Partitioner<?>> partitionerClass =
(Class<? extends Partitioner<?>>) Class.forName("co.myapp.RawDumpHourlyPartitioner");
Partitioner<?> partitioner = partitionerClass.newInstance();
}
}