У меня есть тема в кафке как internal
.Я создал тему, используя приведенную ниже команду
/opt/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper:2181
--replication-factor 3 -partitions 6
--topic internal
Мне нужно использовать все сообщения на трех разных серверах узлов.Поэтому я использую модуль kafka-node в качестве группы потребителей с другим именем.Я создал имя группы потребителей под названием group1
, group2
, group3
.
Все работает нормально, я могу использовать все сообщения для всех потребителей.
Но когда любой брокерне работает, потребитель не получает никакого сообщения.Когда я перечисляю все группы потребителей, он не отображает конкретный идентификатор группы.
(например) Если nodeserver 1
не работает, в брокере нет доступных групп с именем group1
Evenесли я перезапускаю сервер узла, он не создает группу в посреднике и не потребляет никаких сообщений на соответствующем сервере узла.Но когда посредник работает и сервер узла перезапускается, он создает группу в посреднике, и сервер узла может получать сообщение.
consumer.js
const options = {
kafkaHost: process.env.KAFKA_HOST,
groupId: group_id, //group1 (or) group2 (or) group3
autoCommit: true,
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
migrateHLC: false,
migrateRolling: true,
fetchMaxBytes: 1024 * 1024 * 10,
commitOffsetsOnFirstJoin: true,
onRebalance: (isAlreadyMember, callback) => {
log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
callback();
}
};
const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);
// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled
// On error receiving message
consumerGroup.on('error', function(err) {
log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});
ОБНОВЛЕНИЕ - 1
Даже если я обновил offsets.topic.replication.factor
как 2
или 3
, у меня возникла та же проблема.Когда какой-либо брокер не работает, соответствующий сервер узла не использует сообщение.А также, когда я показываю список групп в брокере, он показывает только group2
и group3
.Но group1
не существует, когда broker1
не работает.Даже если я перезапущу потребителя узла, group1
не создается.
server.properties
broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false
ОБНОВЛЕНИЕ - 2
Когда брокер не работает, координатор группы удаляется, и он не переизбирается автоматически.
Ребята, скажите, что я сделал не так?Или мне нужно обновить что-нибудь еще?