как сделать равномерное распределение сообщений на всех разделах - PullRequest
0 голосов
/ 31 августа 2018

enter image description here Я использую kafka, отправляю сообщение брокеру kafka, мой номер раздела 24, я хочу отправлять сообщения равномерного распределения на 24 раздела. теперь мой ключ как,

String topicName="data_"+region;
JSONObject jsonObject = JSON.parseObject(json);
Random rand = new Random();
int  n = rand.nextInt(50) + 1;
ListenableFuture<SendResult<Integer, String>> result =kafkaTemplate.send(topicName,type+n,jsonObject.toJSONString());

но эти сообщения не равномерного распределения. как спроектировать мой ключ? хэш-значение или что-то еще? Спасибо за ваше предложение!

1 Ответ

0 голосов
/ 31 августа 2018

Ну, краткий ответ: префикс вашего ключа (type+n) является виновником. Но почему? Ну, я не уверен, потому что сегодня я оставил свою математику дома: -)

Тем не менее, давайте заглянем под капот! Когда вы используете ключ для своих записей (что я настоятельно рекомендую, потому что вы, возможно, захотите использовать сжатие журналов позже), и вы пишете свое приложение на Java или Spring Kafka, раздел, в котором ваша запись закончится, определяется библиотеками Java Kafka. Более конкретно, принимающий решение - реализация по умолчанию org.apache.kafka.clients.producer.Partitioner. Эта реализация org.apache.kafka.clients.producer.internals.DefaultPartitioner. Смотри здесь .

Вот как фактически рассчитывается раздел:

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Используемая хеш-функция: murmur2 . Давайте напишем короткий фрагмент, который будет эмулировать распределение 10K записей по 24 разделам, если вы префиксите свой ключ (как вы делаете):

Random rand = new Random();

Map<Integer, Integer> distro = new HashMap<>();
    for (int i = 0; i < 10000; i++) {
      int n = rand.nextInt(50) + 1;

      int partition = Utils.toPositive(Utils.murmur2(("type_" + String.valueOf(n)).getBytes())) % 24;
      int cnt = distro.getOrDefault(partition, 0) + 1;

      distro.put(partition, cnt);
    }

    distro.entrySet().forEach(e-> System.out.println("Partition= " + e.getKey() + " Entries= " + e.getValue()));

Вот плохой дистрибутив, с которым вы столкнулись:

Partition= 2 Entries= 180
Partition= 4 Entries= 388
Partition= 5 Entries= 813
Partition= 6 Entries= 1438
Partition= 7 Entries= 572
Partition= 9 Entries= 791
Partition= 10 Entries= 1036
Partition= 12 Entries= 815
Partition= 14 Entries= 184
Partition= 15 Entries= 579
Partition= 16 Entries= 608
Partition= 18 Entries= 610
Partition= 19 Entries= 215
Partition= 20 Entries= 562
Partition= 21 Entries= 395
Partition= 22 Entries= 370
Partition= 23 Entries= 444

Как видите, некоторые разделы даже не заполнены, а разделы 10 и 6 немного перегружены. Теперь давайте удалим префикс из вашего маленького ключа следующим образом:

int partition = Utils.toPositive(Utils.murmur2((String.valueOf(n)).getBytes())) % 24;

Сейчас все выглядит немного более равномерно, но все еще не идеально:

Partition= 0 Entries= 799
Partition= 1 Entries= 411
Partition= 3 Entries= 835
Partition= 4 Entries= 224
Partition= 5 Entries= 563
Partition= 6 Entries= 591
Partition= 7 Entries= 812
Partition= 8 Entries= 596
Partition= 10 Entries= 211
Partition= 11 Entries= 424
Partition= 12 Entries= 608
Partition= 13 Entries= 225
Partition= 14 Entries= 187
Partition= 15 Entries= 786
Partition= 16 Entries= 584
Partition= 18 Entries= 606
Partition= 19 Entries= 425
Partition= 21 Entries= 159
Partition= 22 Entries= 554
Partition= 23 Entries= 400

Вы можете использовать UUID для ваших ключей, как мы, например ::1010

int partition = Utils.toPositive(Utils.murmur2(UUID.randomUUID().toString().getBytes())) % 24;

И это работает довольно гладко с шумом2:

Partition= 0 Entries= 429
Partition= 1 Entries= 407
Partition= 2 Entries= 420
Partition= 3 Entries= 435
Partition= 4 Entries= 407
Partition= 5 Entries= 421
Partition= 6 Entries= 403
Partition= 7 Entries= 460
Partition= 8 Entries= 399
Partition= 9 Entries= 415
Partition= 10 Entries= 386
Partition= 11 Entries= 402
Partition= 12 Entries= 424
Partition= 13 Entries= 434
Partition= 14 Entries= 391
Partition= 15 Entries= 426
Partition= 16 Entries= 399
Partition= 17 Entries= 430
Partition= 18 Entries= 435
Partition= 19 Entries= 418
Partition= 20 Entries= 403
Partition= 21 Entries= 418
Partition= 22 Entries= 402
Partition= 23 Entries= 436

Другим вариантом является увеличение диапазона вашего ключа, который в настоящее время до 50.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...