kafka включить по умолчанию круговой разделитель робина-er - PullRequest
0 голосов
/ 07 июля 2019

В одном из моих приложений мне нужно применить стратегию разделения ключей Round Robbin на моем производителе кафки.

Запись в другой раздел работает только с настройками ниже (1) :

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyRandomPartioner.class);

И класс MyRandomPartitioner реализован следующим образом:

public class MyRandomPartioner implements Partitioner {
    private Logger logger = LoggerFactory.getLogger(MyRandomPartitioner.class);

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        logger.info(" Partition of Topic :" + numPartitions);
            Random randomGenerator = new Random();
            int randomInt = randomGenerator.nextInt(4) + 1;
            logger.info(" selected Partition of Topic :" + randomInt);
            return  randomInt;

    }

    @Override
    public void close() {
    }

} 

Поскольку я хочу иметь равное распределение, я отключил вышеупомянутые реквизиты (1) , тогда он всегда записывает в один раздел.

Код моего производителя:

void sendData(String operation, String message){
final ProducerRecord<String, String> record = new ProducerRecord<String, String>(producerKafkaConfig.getTopicName(), operation,message);
            producer.send(record, new ProducerCallback()); 
        }
//Here operation is always fixed and message is my actual content. 

1 Ответ

1 голос
/ 08 июля 2019

Поскольку ваши записи являются ключом и значением, разделитель по умолчанию проверит ключ, если ключа нет, то только он будет выполнять нормальное разбиение, в противном случае хеш будет рассчитываться на основе ключа.

Incase из ключа не может быть удален для ваших записей, вы можете использовать следующий код разделителя

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] 

    valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();

            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }

        }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...