Из приложения Node.js я пытаюсь подключиться к брокерам Apache Kafka (версия 0.11.0.3) с помощью клиента node-rdkafka. Приложение может подключиться к kafka, подписаться на темы на некоторое время, но внезапно я вижу следующую ошибку после примерно полутора дней в журналах (обратите внимание на метку времени). После этих логов я больше не вижу успешного подключения к кафке. Какие дополнительные опции мне нужны, чтобы избежать этой ошибки? Не было такой ошибки в Java codebase, которая подключается к тому же кластеру kafka.
Успешных журналов:
2019-03-15T07:15:24.364Z - [32minfo[39m: Producer receipt-svc#producer-1 ready
2019-03-15T07:15:24.410Z - [32minfo[39m: Consumer receipt-svc#consumer-2 ready
2019-03-15T07:15:24.411Z - [32minfo[39m: Subscribing to following topics : XXXX.XXXX.XXXX,XXXX.XXXX.XXXX,XXXX.XXXX.XXXX
2019-03-15T07:15:24.462Z - [32minfo[39m: Consumer receipt-svc#consumer-3 ready
2019-03-15T07:15:24.463Z - [32minfo[39m: Subscribing to following topics : XXXX.XXXX.XXXX
Журналы ошибок:
<------Kafka event log : Start ------->
2019-03-16T12:25:25.629Z - [32minfo[39m: severity=4, fac=METADATA, message=[thrd:main]: sasl_ssl://sl73kfkapp002.visa.com:8443/43: Metadata request failed: periodic refresh: Local: Required feature not supported by broker (0ms): Permanent
2019-03-16T12:25:25.630Z - [32minfo[39m: <------Kafka event log : End ------->
2019-03-17T12:30:26.301Z - [32minfo[39m: <------Kafka event log : Start ------->
2019-03-17T12:30:26.302Z - [32minfo[39m: severity=4, fac=METADATA, message=[thrd:main]: sasl_ssl://sl73kfkapp002.visa.com:8443/43: Metadata request failed: refresh unavailable topics: Local: Required feature not supported by broker (0ms): Permanent
2019-03-17T12:30:26.303Z - [32minfo[39m: <------Kafka event log : End ------->
Над журналами напечатаны следующие фрагменты кода
const consumer = new Kafka.KafkaConsumer(
{
"client.id": config["client.id"],
"group.id": config["group.id"],
"security.protocol": config["security.protocol"],
"metadata.broker.list": config["metadata.broker.list"],
"ssl.ca.location": config["ssl.ca.location"],
"ssl.certificate.location": config["ssl.certificate.location"],
"ssl.key.location": config["ssl.key.location"],
"ssl.key.password": this.machineKey.decrypt(config["ssl.key.password"]),
"sasl.kerberos.service.name": config["sasl.kerberos.service.name"],
"sasl.kerberos.principal": config["sasl.kerberos.principal"],
"sasl.kerberos.keytab": config["sasl.kerberos.keytab"]
},
{
"auto.offset.reset": "beginning",
"enable.auto.commit": true
}
);
consumer.on("event.log", function(log) {
logger.info("<------Kafka event log : Start ------->");
logger.info(log);
logger.info("<------Kafka event log : End ------->");
});
consumer.on("ready", function(arg) {
logger.info(`Consumer ${arg.name} ready`);
logger.info("Subscribing to following topics : " + topics);
consumer.subscribe(topics);
consumer.consume();
});
consumer.connect();