В одном из моих приложений мне нужно применить стратегию разделения ключей Round Robbin на моем производителе кафки.
Запись в другой раздел работает только с настройками ниже (1) :
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandomPartioner.class);
И класс MyRandomPartitioner
реализован следующим образом:
public class MyRandomPartioner implements Partitioner {
private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
logger.info(" Partition of Topic :" + numPartitions);
Random randomGenerator = new Random();
int randomInt = randomGenerator.nextInt(4) + 1;
logger.info(" selected Partition of Topic :" + randomInt);
return randomInt;
}
@Override
public void close() {
}
}
Поскольку я хочу иметь равное распределение, я отключил вышеупомянутые реквизиты (1) , тогда он всегда записывает в один раздел.
Код моего производителя:
void sendData(String operation, String message){
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(producerKafkaConfig.getTopicName(), operation,message);
producer.send(record, new ProducerCallback());
}
//Here operation is always fixed and message is my actual content.