Кафка - нескольким потребителям из одной группы назначен один и тот же раздел
Я только начал изучать Кафку и Нодейс. Я написал потребителю следующее
// consumer.js
const kafka = require('kafka-node');
var client = new kafka.Client('localhost:2181');
var topics = [{
topic: 'topic-4'
}];
var options = {
groupId: 'kafka-node-group-2',
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'buffer'
};
var consumer = new kafka.HighLevelConsumer(client, topics, options);
// consumer.payloads has only one entry
console.log('Topic', consumer.payloads[0].topic);
console.log('Group', consumer.options.groupId);
console.log('Assigned Partition:', consumer.payloads[0].partition);
выход
Topic topic-4
Group kafka-node-group-2
Assigned Partition: 0
topic-4
имеет четыре раздела.
./desc_topic.sh topic-4
Topic:topic-4 PartitionCount:4 ReplicationFactor:1 Configs:
Topic: topic-4 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-4 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: topic-4 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: topic-4 Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Редактировать
Я использовал ConsumerGroup
следующим образом.
var options = {
host: 'localhost:2181', // zookeeper host omit if connecting directly to broker (see kafkaHost below)
groupId: 'Group-1',
sessionTimeout: 15000,
// // An array of partition assignment protocols ordered by preference.
// // 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
protocol: ['roundrobin']
};
var consumer = new kafka.ConsumerGroup(options, ['topic-4']);
Производитель отправляет 100 сообщений, полученных следующим образом. Вот так я знаю назначенный раздел (не из consumer
объекта).
{
topic: 'topic-4',
value: '{"subject":"Message Id 30 "}',
offset: 172,
partition: 0,
highWaterOffset: 173,
key: null
}
Когда я запускаю два таких потребительских экземпляра (одну и ту же тему и группу), только один из них получает все из раздела-0. Разве это тоже не проблема?
Это код производителя.
const kafka = require('kafka-node');
const Client = kafka.Client;
var client = new Client('localhost:2181', 'my-client-id', {
sessionTimeout: 300,
spinDelay: 100,
retries: 2
});
// For this demo we just log client errors to the console.
client.on('error', function(error) {
console.error(error);
});
var producer = new kafka.HighLevelProducer(client);
producer.on('ready', function() {
for (var i = 0; i <= 30; i++) {
let id = 'Message Id ' + i + ' ';
let msg = {
'subject': id
};
var messageBuffer = Buffer.from(JSON.stringify(msg));
// Create a new payload
var payload = [{
// topic: 'topic-', + (i%2+2),
topic: 'topic-4',
messages: messageBuffer,
timestamp: Date.now(),
attributes: 1 /* Use GZip compression for the payload */
}];
//Send payload to Kafka and log result/error
producer.send(payload, function(error, result) {
console.info('Sent payload to Kafka: ', payload);
if (error) {
console.error('Error', error);
} else {
var formattedResult = result[0];
console.log('result: ', result)
}
});
}
});
// For this demo we just log producer errors to the console.
producer.on('error', function(error) {
console.error(error);
});