Пользовательское назначение раздела в коннекторе Kafka JDBC - PullRequest
3 голосов
/ 15 марта 2019

У меня есть случай, когда мне нужно написать собственную логику для назначения раздела на основе определенных ключевых параметров из сообщения. Я провел некоторое исследование по этому вопросу и обнаружил, что преобразование kafka поддерживает переопределение некоторых методов в интерфейсе Transformation, но я не смог сделать некоторый пример кода в git hub или где-то еще. кто-то может поделиться примером кода или ссылкой на git hub, чтобы выполнить пользовательское назначение раздела в исходном соединителе JDBC kafka?

Заранее спасибо!.

1 Ответ

2 голосов
/ 15 марта 2019

Kafka Connect для назначения разделов по умолчанию использует: DefaultPartitioner (org.apache.kafka.clients.producer.internals.DefaultPartitioner)

Если вам нужно переопределить значение по умолчанию другим, это возможно, но вы должны помнить, что переопределение применяется к всем исходным соединителям . Для этого вам нужно установить свойство producer.partitioner.class, например producer.partitioner.class=com.example.CustomPartitioner. Кроме того, вы должны скопировать jar с вашим разделителем в каталог с библиотеками Kafka Connect.

Способ трансформации:

Настройка раздела также возможна в Transformation, но это неправильный подход. Начиная с Transformation у вас нет доступа к метаданным темы, которые имеют решающее значение для назначения разделов.

Если в любом случае вы хотите установить разделы для своих записей, код должен выглядеть следующим образом:

public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {

    public static final ConfigDef CONFIG_DEF = new ConfigDef();

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    }

    @Override
    public R apply(R record) {
        return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }

    private Integer calculatePartition(R record) {
        // Partitions calcuation based on record information
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }
}
...