узел группы потребителей kafka не получает сообщение, когда брокер выключен - PullRequest
0 голосов
/ 24 декабря 2018

У меня есть тема в кафке как internal.Я создал тему, используя приведенную ниже команду

/opt/kafka/bin/kafka-topics.sh 
     --create --zookeeper zookeeper:2181 
     --replication-factor 3 -partitions 6 
     --topic internal

Мне нужно использовать все сообщения на трех разных серверах узлов.Поэтому я использую модуль kafka-node в качестве группы потребителей с другим именем.Я создал имя группы потребителей под названием group1, group2, group3.

Все работает нормально, я могу использовать все сообщения для всех потребителей.

Но когда любой брокерне работает, потребитель не получает никакого сообщения.Когда я перечисляю все группы потребителей, он не отображает конкретный идентификатор группы.

(например) Если nodeserver 1 не работает, в брокере нет доступных групп с именем group1

Evenесли я перезапускаю сервер узла, он не создает группу в посреднике и не потребляет никаких сообщений на соответствующем сервере узла.Но когда посредник работает и сервер узла перезапускается, он создает группу в посреднике, и сервер узла может получать сообщение.

consumer.js

const options = {
  kafkaHost: process.env.KAFKA_HOST, 
  groupId: group_id, //group1 (or) group2 (or) group3
  autoCommit: true,
  sessionTimeout: 15000,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  outOfRangeOffset: 'earliest',
  migrateHLC: false,
  migrateRolling: true,
  fetchMaxBytes: 1024 * 1024 * 10,
  commitOffsetsOnFirstJoin: true,
  onRebalance: (isAlreadyMember, callback) => {
      log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
      callback();
  }
};

const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);

// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled

// On error receiving message
consumerGroup.on('error', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
    log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});

ОБНОВЛЕНИЕ - 1

Даже если я обновил offsets.topic.replication.factor как 2 или 3, у меня возникла та же проблема.Когда какой-либо брокер не работает, соответствующий сервер узла не использует сообщение.А также, когда я показываю список групп в брокере, он показывает только group2 и group3.Но group1 не существует, когда broker1 не работает.Даже если я перезапущу потребителя узла, group1 не создается.

server.properties

broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false

ОБНОВЛЕНИЕ - 2

Когда брокер не работает, координатор группы удаляется, и он не переизбирается автоматически.

Ребята, скажите, что я сделал не так?Или мне нужно обновить что-нибудь еще?

Ответы [ 2 ]

0 голосов
/ 26 декабря 2018

Даже если я обновил offsets.topic.replication.factor как 2 или 3, у меня возникла та же проблема.Когда какой-либо брокер не работает, соответствующий сервер узла не потребляет сообщение

После создания темы смещений изменение этого свойства ничего не дает.

Если для было установлено , то теперь вам нужно , чтобы вручную увеличить его

0 голосов
/ 26 декабря 2018

Предполагая, что это как минимум Kafka 1.x, требуются некоторые изменения в отношении HA для internal тем Kafka.Рассмотрим следующий фрагмент из server.properties.Значения по умолчанию для репликации установлены на 1. В вашем случае для 3 брокеров их установка на 2 может быть хорошим началом.

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1

Добавление

Насколько я понимаю, у каждой группы потребителей есть свой координатор группы.Итак, если есть несколько групп, использующих тему, то может быть несколькими координаторами (разными посредниками) для этой темы.Брокер может действовать как group coordinator для нескольких групп потребителей.Но для группы потребителей есть только один брокер, который выступает в качестве координатора.Для конкретной группы потребителей мы можем проверить, какой посредник является координатором, с помощью этой команды:

./kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group <consumer-group> --state 

Если координатор не работает, в качестве координатора выбирается другой посредник.Стратегия восстановления после сбоя подробно объясняется здесь в разделе 10.

...