Ну, краткий ответ: префикс вашего ключа (type+n)
является виновником. Но почему? Ну, я не уверен, потому что сегодня я оставил свою математику дома: -)
Тем не менее, давайте заглянем под капот! Когда вы используете ключ для своих записей (что я настоятельно рекомендую, потому что вы, возможно, захотите использовать сжатие журналов позже), и вы пишете свое приложение на Java или Spring Kafka, раздел, в котором ваша запись закончится, определяется библиотеками Java Kafka. Более конкретно, принимающий решение - реализация по умолчанию org.apache.kafka.clients.producer.Partitioner
. Эта реализация org.apache.kafka.clients.producer.internals.DefaultPartitioner
. Смотри здесь .
Вот как фактически рассчитывается раздел:
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
Используемая хеш-функция: murmur2 . Давайте напишем короткий фрагмент, который будет эмулировать распределение 10K записей по 24 разделам, если вы префиксите свой ключ (как вы делаете):
Random rand = new Random();
Map<Integer, Integer> distro = new HashMap<>();
for (int i = 0; i < 10000; i++) {
int n = rand.nextInt(50) + 1;
int partition = Utils.toPositive(Utils.murmur2(("type_" + String.valueOf(n)).getBytes())) % 24;
int cnt = distro.getOrDefault(partition, 0) + 1;
distro.put(partition, cnt);
}
distro.entrySet().forEach(e-> System.out.println("Partition= " + e.getKey() + " Entries= " + e.getValue()));
Вот плохой дистрибутив, с которым вы столкнулись:
Partition= 2 Entries= 180
Partition= 4 Entries= 388
Partition= 5 Entries= 813
Partition= 6 Entries= 1438
Partition= 7 Entries= 572
Partition= 9 Entries= 791
Partition= 10 Entries= 1036
Partition= 12 Entries= 815
Partition= 14 Entries= 184
Partition= 15 Entries= 579
Partition= 16 Entries= 608
Partition= 18 Entries= 610
Partition= 19 Entries= 215
Partition= 20 Entries= 562
Partition= 21 Entries= 395
Partition= 22 Entries= 370
Partition= 23 Entries= 444
Как видите, некоторые разделы даже не заполнены, а разделы 10 и 6 немного перегружены. Теперь давайте удалим префикс из вашего маленького ключа следующим образом:
int partition = Utils.toPositive(Utils.murmur2((String.valueOf(n)).getBytes())) % 24;
Сейчас все выглядит немного более равномерно, но все еще не идеально:
Partition= 0 Entries= 799
Partition= 1 Entries= 411
Partition= 3 Entries= 835
Partition= 4 Entries= 224
Partition= 5 Entries= 563
Partition= 6 Entries= 591
Partition= 7 Entries= 812
Partition= 8 Entries= 596
Partition= 10 Entries= 211
Partition= 11 Entries= 424
Partition= 12 Entries= 608
Partition= 13 Entries= 225
Partition= 14 Entries= 187
Partition= 15 Entries= 786
Partition= 16 Entries= 584
Partition= 18 Entries= 606
Partition= 19 Entries= 425
Partition= 21 Entries= 159
Partition= 22 Entries= 554
Partition= 23 Entries= 400
Вы можете использовать UUID для ваших ключей, как мы, например ::1010
int partition = Utils.toPositive(Utils.murmur2(UUID.randomUUID().toString().getBytes())) % 24;
И это работает довольно гладко с шумом2:
Partition= 0 Entries= 429
Partition= 1 Entries= 407
Partition= 2 Entries= 420
Partition= 3 Entries= 435
Partition= 4 Entries= 407
Partition= 5 Entries= 421
Partition= 6 Entries= 403
Partition= 7 Entries= 460
Partition= 8 Entries= 399
Partition= 9 Entries= 415
Partition= 10 Entries= 386
Partition= 11 Entries= 402
Partition= 12 Entries= 424
Partition= 13 Entries= 434
Partition= 14 Entries= 391
Partition= 15 Entries= 426
Partition= 16 Entries= 399
Partition= 17 Entries= 430
Partition= 18 Entries= 435
Partition= 19 Entries= 418
Partition= 20 Entries= 403
Partition= 21 Entries= 418
Partition= 22 Entries= 402
Partition= 23 Entries= 436
Другим вариантом является увеличение диапазона вашего ключа, который в настоящее время до 50.