У меня чтение группы потребителей из топи c с десятью разделами:
[root@kafka01 kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ssIncomingGroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ssIncomingGroup ssIncoming 3 13688 13987 299 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 7 13484 13868 384 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 2 13322 13698 376 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 8 13612 13899 287 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 1 13568 13932 364 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 6 13651 13950 299 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 0 13609 13896 287 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 5 13646 13945 299 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 4 13543 13843 300 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 9 13652 13951 299 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
Я использую библиотеку Segment.io Kaka для Go: "github.com/segmentio/kafka-go"
.
Мой писатель Кафки выглядит так:
kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
Async: false,
Brokers: config.KafkaHosts, // a string slice of 4 Kafka hosts
QueueCapacity: kafkaQueueCapacity,
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{}, // Same result with the default round-robin balancer
})
Мой читатель Кафки выглядит так:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaHosts, // same as above
GroupID: config.KafkaGroup,
Topic: config.KafkaTopic, // same as above
})
Топи c изначально был создан так:
conn.CreateTopics(kafka.TopicConfig{
NumPartitions: config.KafkaPartitions, // == 10
ReplicationFactor: config.KafkaReplication, // == 1
Topic: kafkaTopic, // same as above
})
Когда я запускаю свою программу и наблюдаю за нагрузкой на хост и сеть, я вижу, что почти вся нагрузка / сетевая активность выполняется на одном из четырех брокеров Kafka. Когда я du
записываю каталоги для хостов Kafka, на этом же хосте гораздо больше данных Kafka на ФС, чем на других (например, 150M вместо 15M).
Что я хочу и ожидаю случается, что нагрузка распределяется между всеми четырьмя серверами Kafka, чтобы один из них не становился узким местом (из ЦП или сети). Почему этого не происходит?
Редактировать (добавить запрошенный вывод команды):
[root@kafka01 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
Topic: ssIncoming PartitionCount: 10 ReplicationFactor: 1 Configs: flush.ms=1000,segment.bytes=536870912,flush.messages=10000,retention.bytes=1073741824
Topic: ssIncoming Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 2 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 6 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=10000,retention.bytes=1073$41824
Topic: __consumer_offsets Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 3 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 9 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 11 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 12 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 15 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 16 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 17 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 18 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 19 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 20 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 21 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 23 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 24 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 25 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 27 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 28 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 29 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 30 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 31 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 32 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 33 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 35 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 36 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 37 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 39 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 40 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 41 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 42 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 43 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 44 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 45 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 46 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 47 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 48 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1
(Редактировать 2): Вот переменные, которые я использую при создании файлов конфигурации Kafka. Они одинаковы для каждого из 4 брокеров.
scala_version: 2.12
kafka_config_broker_id: 0
kafka_config_log_dirs: "/tmp/kafka_logs"
kafka_config_log_flush_interval_messages: 10000
kafka_config_log_flush_interval_ms: 1000
kafka_config_log_retention_bytes: 1073741824
kafka_config_log_retention_check_interval: 60000
kafka_config_log_retention_hours: 168
kafka_config_log_segment_bytes: 536870912
kafka_config_num_io_threads: 4
kafka_config_num_network_threads: 2
kafka_config_num_partitions: 2
kafka_config_offsets_topic_replication_factor: 1
kafka_config_receive_buffer_bytes: 1048576
kafka_config_request_max_bytes: 104857600
kafka_config_send_buffer_bytes: 1048576
kafka_config_zookeeper_connection_timeout_ms: 1000000
kafka_config_zookeeper_servers:
- consul01
- consul02
- consul03
kafka_exporter_version: 1.2.0
kafka_port: 9092
kafka_version: 2.4.0
Эти данные используются в шаблоне Ansible. Сгенерированные конфетки кафки выглядят так:
broker.id=1
port=9092
num.network.threads=2
num.io.threads=4
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka_logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
offsets.topic.replication.factor=1
zookeeper.connect=consul01:2181,consul02:2181,consul03:2181
zookeeper.connection.timeout.ms=1000000
delete.topic.enable=true
Обратите внимание, что это для разработки, и они часто обновляются (несколько раз в день). Проблема сохраняется после каждого респина.