Я создаю простой API-интерфейс node.js с использованием express и kafka-node, который возвращает непрочитанные сообщения из запрошенной темы Kafka и группы потребителей при получении HTTP-запроса, а затем закрывает соединение.Мне не нужно или я не хочу, чтобы потребитель продолжал ждать новых сообщений.
В kafka-node, как правильно проверить, достигнут ли конец темы, и если да, закройте соединениеброкеру и выходу из приложения, чтобы предотвратить чтение новых сообщений?
Вот мой consumer.js.Это почти то же самое, что пример, приведенный в документации к kafka-node.
"use strict";
const kafka = require("kafka-node");
let topicName = "testTopic-01",
groupName = "testGroup-01",
consumerOptions = {
kafkaHost: "localhost: 9092",
groupId: groupName,
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "earliest",
encoding: "utf8"
};
const consumerGroup = new kafka.ConsumerGroup(consumerOptions, topicName);
consumerGroup.on("message", message => {
console.log(`Message: ${message.value}`);
});
consumerGroup.on("error", error => {
console.error(error);
});
console.log(`Consumer started on topic ${topicName} on group ${groupName}`);