Kafka Connect для назначения разделов по умолчанию использует: DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
)
Если вам нужно переопределить значение по умолчанию другим, это возможно, но вы должны помнить, что переопределение применяется к всем исходным соединителям .
Для этого вам нужно установить свойство producer.partitioner.class
, например producer.partitioner.class=com.example.CustomPartitioner
.
Кроме того, вы должны скопировать jar с вашим разделителем в каталог с библиотеками Kafka Connect.
Способ трансформации:
Настройка раздела также возможна в Transformation, но это неправильный подход.
Начиная с Transformation
у вас нет доступа к метаданным темы, которые имеют решающее значение для назначения разделов.
Если в любом случае вы хотите установить разделы для своих записей, код должен выглядеть следующим образом:
public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
private Integer calculatePartition(R record) {
// Partitions calcuation based on record information
return 0;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}