Я создаю приложение для распространения событий, которое распространяется в трех местах AWS.Приложение на каждом сайте подключено к 2 экземплярам kafka (расположенным в одном из EC2) и в зависимости от поведения приложения оно записывает (создает) в одну очередь или считывает (использует) из другой очереди.
Приложение построено с использованием NodeJS и использует библиотеку kafka-node (https://www.npmjs.com/package/kafka-node).
). Проблема в том, что потребитель читает сообщения, предположительно фиксирует его, но как только все сообщения прочитаны, потребитель начинает сначало. Кроме того, сообщение со смещением = 0 всегда пустое, хотя я никогда не создаю пустое сообщение.
Удивительно, но это происходит только тогда, когда оно развернуто в разных местах. В моем локальном хосте настройка работает отлично.
Ниже приведен фрагмент моего потребителя:
function consumeKafkaMessages() {
const client = new kafka.KafkaClient({kafkaHost: process.env.KAFKA_CONSUMER_HOSTNAME});
const consumer = new kafka.Consumer(
client,
[{ topic: config.KAFKA_TOPIC }],
{ autoCommit: true }
);
consumer.on('message', async function(message) {
console.log("===============\n");
console.log("Consumed operation from kafka => " + JSON.stringify(message));
console.log("===============\n");
let parsedMessage;
let failed = false;
try {
parsedMessage = JSON.parse(message.value);
} catch (error) {
failed = true;
}
if (!failed) {
// application logic, not important
}
});
Есть идеи?
Вся помощь приветствуется: -)