Вы не можете прослушивать только определенный раздел в Kafka Connect.
Но вы можете добиться функциональности вставки сообщений только из определенных разделов.
Чтобы иметь такую функцию, вам нужно реализовать свой пользовательский Transformation
.
Если Transformation
возвращает null
, сообщение пропущено, поэтому ваш пользовательский Transformation
должен вернуть null
для нежелательных разделов.
Пример кода будет следующим:
public class PartitionFilter <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
int neededPartition = 1; // some parititon
if (record.kafkaPartition() != neededPartition)
return null;
return record;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
Более подробную информацию о преобразованиях можно найти: https://kafka.apache.org/documentation/#connect_transforms