Почему моя нагрузка Kafka не сбалансирована между брокерами? - PullRequest
0 голосов
/ 15 февраля 2020

У меня чтение группы потребителей из топи c с десятью разделами:

[root@kafka01 kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ssIncomingGroup --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                        HOST            CLIENT-ID
ssIncomingGroup ssIncoming      3          13688           13987           299             ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      7          13484           13868           384             ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      2          13322           13698           376             ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      8          13612           13899           287             ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      1          13568           13932           364             ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      6          13651           13950           299             ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      0          13609           13896           287             ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      5          13646           13945           299             ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      4          13543           13843           300             ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming      9          13652           13951           299             ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10  ssTS@influx01 (github.com/segmentio/kafka-go)

Я использую библиотеку Segment.io Kaka для Go: "github.com/segmentio/kafka-go".

Мой писатель Кафки выглядит так:

kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
    Async:         false,
    Brokers:       config.KafkaHosts,  // a string slice of 4 Kafka hosts
    QueueCapacity: kafkaQueueCapacity,
    Topic:         kafkaTopic,
    Balancer: &kafka.LeastBytes{},  // Same result with the default round-robin balancer
})

Мой читатель Кафки выглядит так:

    kafkaReader := kafka.NewReader(kafka.ReaderConfig{
        Brokers: config.KafkaHosts,  // same as above
        GroupID: config.KafkaGroup,
        Topic:   config.KafkaTopic,  // same as above
    })

Топи c изначально был создан так:

conn.CreateTopics(kafka.TopicConfig{
    NumPartitions:     config.KafkaPartitions,  // == 10
    ReplicationFactor: config.KafkaReplication,  // == 1
    Topic:             kafkaTopic,  // same as above
})

Когда я запускаю свою программу и наблюдаю за нагрузкой на хост и сеть, я вижу, что почти вся нагрузка / сетевая активность выполняется на одном из четырех брокеров Kafka. Когда я du записываю каталоги для хостов Kafka, на этом же хосте гораздо больше данных Kafka на ФС, чем на других (например, 150M вместо 15M).

Что я хочу и ожидаю случается, что нагрузка распределяется между всеми четырьмя серверами Kafka, чтобы один из них не становился узким местом (из ЦП или сети). Почему этого не происходит?

Редактировать (добавить запрошенный вывод команды):

[root@kafka01 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092                                                                                                                                    
Topic: ssIncoming       PartitionCount: 10      ReplicationFactor: 1    Configs: flush.ms=1000,segment.bytes=536870912,flush.messages=10000,retention.bytes=1073741824                                                    
        Topic: ssIncoming       Partition: 0    Leader: 4       Replicas: 4     Isr: 4                                                                                                                                    
        Topic: ssIncoming       Partition: 1    Leader: 2       Replicas: 2     Isr: 2                                                                                                                                    
        Topic: ssIncoming       Partition: 2    Leader: 3       Replicas: 3     Isr: 3                       
        Topic: ssIncoming       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
        Topic: ssIncoming       Partition: 4    Leader: 4       Replicas: 4     Isr: 4                       
        Topic: ssIncoming       Partition: 5    Leader: 2       Replicas: 2     Isr: 2                      
        Topic: ssIncoming       Partition: 6    Leader: 3       Replicas: 3     Isr: 3                       
        Topic: ssIncoming       Partition: 7    Leader: 1       Replicas: 1     Isr: 1                       
        Topic: ssIncoming       Partition: 8    Leader: 4       Replicas: 4     Isr: 4                       
        Topic: ssIncoming       Partition: 9    Leader: 2       Replicas: 2     Isr: 2                       
Topic: __consumer_offsets       PartitionCount: 50      ReplicationFactor: 1    Configs: compression.type=producer,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=10000,retention.bytes=1073$41824     
        Topic: __consumer_offsets       Partition: 0    Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 1    Leader: 1       Replicas: 1     Isr: 1                                                                                                                            
        Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 2     Isr: 2               
        Topic: __consumer_offsets       Partition: 3    Leader: 3       Replicas: 3     Isr: 3               
        Topic: __consumer_offsets       Partition: 4    Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 5    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 6    Leader: 2       Replicas: 2     Isr: 2          
        Topic: __consumer_offsets       Partition: 7    Leader: 3       Replicas: 3     Isr: 3      
        Topic: __consumer_offsets       Partition: 8    Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 9    Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 10   Leader: 2       Replicas: 2     Isr: 2        
        Topic: __consumer_offsets       Partition: 11   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 12   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 13   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 14   Leader: 2       Replicas: 2     Isr: 2        
        Topic: __consumer_offsets       Partition: 15   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 16   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 17   Leader: 1       Replicas: 1     Isr: 1      
        Topic: __consumer_offsets       Partition: 18   Leader: 2       Replicas: 2     Isr: 2               
        Topic: __consumer_offsets       Partition: 19   Leader: 3       Replicas: 3     Isr: 3                                                                                                                            
        Topic: __consumer_offsets       Partition: 20   Leader: 4       Replicas: 4     Isr: 4                                                                                                                            
        Topic: __consumer_offsets       Partition: 21   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 22   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 23   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 24   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 25   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 26   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 27   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 28   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 29   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 30   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 31   Leader: 3       Replicas: 3     Isr: 3                                                                                                                            
        Topic: __consumer_offsets       Partition: 32   Leader: 4       Replicas: 4     Isr: 4               
        Topic: __consumer_offsets       Partition: 33   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 34   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 35   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 36   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 37   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 38   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 39   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 40   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 41   Leader: 1       Replicas: 1     Isr: 1
        Topic: __consumer_offsets       Partition: 42   Leader: 2       Replicas: 2     Isr: 2
        Topic: __consumer_offsets       Partition: 43   Leader: 3       Replicas: 3     Isr: 3
        Topic: __consumer_offsets       Partition: 44   Leader: 4       Replicas: 4     Isr: 4
        Topic: __consumer_offsets       Partition: 45   Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 46   Leader: 2       Replicas: 2     Isr: 2
    Topic: __consumer_offsets       Partition: 47   Leader: 3       Replicas: 3     Isr: 3
    Topic: __consumer_offsets       Partition: 48   Leader: 4       Replicas: 4     Isr: 4
    Topic: __consumer_offsets       Partition: 49   Leader: 1       Replicas: 1     Isr: 1

(Редактировать 2): Вот переменные, которые я использую при создании файлов конфигурации Kafka. Они одинаковы для каждого из 4 брокеров.

scala_version: 2.12
kafka_config_broker_id: 0
kafka_config_log_dirs: "/tmp/kafka_logs"
kafka_config_log_flush_interval_messages: 10000
kafka_config_log_flush_interval_ms: 1000
kafka_config_log_retention_bytes: 1073741824
kafka_config_log_retention_check_interval: 60000
kafka_config_log_retention_hours: 168
kafka_config_log_segment_bytes: 536870912
kafka_config_num_io_threads: 4
kafka_config_num_network_threads: 2
kafka_config_num_partitions: 2
kafka_config_offsets_topic_replication_factor: 1
kafka_config_receive_buffer_bytes: 1048576
kafka_config_request_max_bytes: 104857600
kafka_config_send_buffer_bytes: 1048576
kafka_config_zookeeper_connection_timeout_ms: 1000000
kafka_config_zookeeper_servers:
    - consul01
    - consul02
    - consul03
kafka_exporter_version: 1.2.0
kafka_port: 9092
kafka_version: 2.4.0

Эти данные используются в шаблоне Ansible. Сгенерированные конфетки кафки выглядят так:

broker.id=1
port=9092
num.network.threads=2
num.io.threads=4
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka_logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=60000

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

offsets.topic.replication.factor=1

zookeeper.connect=consul01:2181,consul02:2181,consul03:2181
zookeeper.connection.timeout.ms=1000000

delete.topic.enable=true

Обратите внимание, что это для разработки, и они часто обновляются (несколько раз в день). Проблема сохраняется после каждого респина.

1 Ответ

0 голосов
/ 16 февраля 2020

Кажется, что нагрузка теперь очень хорошо сбалансирована:

  • Лидеры разделов распределяются между брокерами максимально сбалансированным образом
    • Брокер 1 является лидером разделов 3,7
    • Брокер 2 - лидер разделов 1,5,9
    • Брокер 3 - лидер разделов 2,6
    • Брокер 4 - лидер разделов 0,4, 8
  • Разделы также назначаются потребителям равномерно (2 раздела на одного потребителя)
  • Количество смещений в разделах почти одинаково (так что, похоже, вы создаете сообщения на разделы равномерно)

Когда я записываю каталоги журналов для хостов Kafka, тот же хост имеет гораздо больше данных Kafka на ФС, чем другие (например, 150M по сравнению с 15M ).

Смещения журналов в разделах практически одинаковы. Но, конечно, у брокеров 2 и 4 должно быть гораздо больше данных, поскольку они имеют дело с еще одним разделом, как вы видите. Также сетевой трафик c должен быть намного больше, потому что они имеют дело с 3 разделами. (опрашивать запросы потребителей, а также отправлять запросы производителей)

Но все же в 10 раз больше данных в одном брокере не имеет смысла. ИМХО, в какой-то момент один или несколько брокеров были нездоровы (не может послать сердцебиение Zookeeper или внизу), и Controller назначил разделы здоровым брокерам, и некоторое время некоторые брокеры заботились о гораздо большем количестве разделов. (кстати, auto.leader.rebalance.enable должно быть true для этого scneario)

Примечание: я предполагаю, что конфиги вашего брокера (особенно конфиги о log.retention играют важную роль для данных, хранящихся в брокерах) и системные ресурсы брокеры одинаковы. Если это не так, вы должны указать это.


Кстати, если вас не устраивает текущее назначение разделов брокерам. Вы можете вручную изменить его с помощью инструмента kafka-reassign-partitions.sh. Вам просто нужно создать файл json, который задает реплики разделов.

Например:

{"version":1,
  "partitions":[
     {"topic":"ssIncoming","partition":0,"replicas":[1]},
     {"topic":"ssIncoming","partition":1,"replicas":[1]},
     {"topic":"ssIncoming","partition":2,"replicas":[1]},
     {"topic":"ssIncoming","partition":3,"replicas":[2]},
     {"topic":"ssIncoming","partition":4,"replicas":[2]},
     {"topic":"ssIncoming","partition":5,"replicas":[3]},
     {"topic":"ssIncoming","partition":6,"replicas":[3]},
     {"topic":"ssIncoming","partition":7,"replicas":[3]},
     {"topic":"ssIncoming","partition":8,"replicas":[4]},
     {"topic":"ssIncoming","partition":9,"replicas":[4]}
]}

Затем вам нужно просто запустить эту команду:

./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file change-replicas.json --execute
...