Правильный способ чтения сообщений из темы Кафки, а затем закрытие - PullRequest
0 голосов
/ 29 мая 2019

Я создаю простой API-интерфейс node.js с использованием express и kafka-node, который возвращает непрочитанные сообщения из запрошенной темы Kafka и группы потребителей при получении HTTP-запроса, а затем закрывает соединение.Мне не нужно или я не хочу, чтобы потребитель продолжал ждать новых сообщений.

В kafka-node, как правильно проверить, достигнут ли конец темы, и если да, закройте соединениеброкеру и выходу из приложения, чтобы предотвратить чтение новых сообщений?

Вот мой consumer.js.Это почти то же самое, что пример, приведенный в документации к kafka-node.

"use strict";

const kafka = require("kafka-node");

let topicName = "testTopic-01",
  groupName = "testGroup-01",
  consumerOptions = {
    kafkaHost: "localhost: 9092",
    groupId: groupName,
    sessionTimeout: 15000,
    protocol: ["roundrobin"],
    fromOffset: "earliest",
    encoding: "utf8"
  };

const consumerGroup = new kafka.ConsumerGroup(consumerOptions, topicName);

consumerGroup.on("message", message => {
  console.log(`Message: ${message.value}`);
});
consumerGroup.on("error", error => {
  console.error(error);
});

console.log(`Consumer started on topic ${topicName} on group ${groupName}`);

1 Ответ

0 голосов
/ 29 мая 2019

Вы можете получить текущее смещение тематического раздела , используя # Offset . Сравнивая полученное таким образом смещение назначенного раздела темы, вы узнаете, какое последнее сообщение в соответствующем разделе темы.

Имейте в виду, что, если у вас есть несколько потребителей параллельно, вы должны отслеживать раздел раздела, которому был назначен ваш потребитель в группе потребителей ( # fetchCommits ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...