Я пытаюсь использовать потребителя и производителя библиотекой npm kafka-node. Чтобы использовать kafka в качестве службы обмена сообщениями через nodejs.
Проблема в том, что, хотя мой производитель иногда работает, потребитель продолжает даватьошибка тайм-аута, или он просто зависает в бесконечном цикле, попробуйте подключиться к kafka, даже taff kafka работает нормально.
Я использую kafka-node на моей машине с Windows, когда Kafka находится на удаленной машине centos7,Странное поведение продолжается, даже если я помещаю весь код (потребитель и производитель) в ту же машину, что и kafka (думая, что mabye windows является частью проблемы).
Я пытался отправлять сообщения во встроенную в консоль производителя kafka, но мой потребитель все еще не появляется, чтобы подписаться на тему и получать сообщения.
Это мой простой код производителя:
const kclient = new kafka.KafkaClient({kafkaHost:'ADDR:9092'});
kclient.on('error',(err) => {
console.log(err)
})
kprod = new producer (kclient);
// kconsumer = new consumer(kclient);
kprod.on('error',(err) => {
console.log(`error: ${err}`);
})
kprod.on('ready',() => {
console.log(`connected to kafka`);
let tranNumSentToKafka = 0
for (let index = 0; index < transArray.length; index++) {
const element = JSON.stringify(transArray[index])
console.log(`sending data to kafka`);
kprod.send([{
topic:'test',
messages:element
}],
(err,data) => {
if(err){console.error(err)}
else{
tranNumSentToKafka += 1
console.log(`data sent: ${JSON.stringify(data)}`);
console.log(`sent ${tranNumSentToKafka} transactions to kafka`);
}
})
}
})
Когда я запускаю это, он иногда дает тайм-аутошибка, например, так:
{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.timeoutId._createTimeout [as _onTimeout] (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\kafkaClient.js:1007:14)
at ontimeout (timers.js:436:11)
at tryOnTimeout (timers.js:300:5)
at listOnTimeout (timers.js:263:5)
at Timer.processTimers (timers.js:223:10) message: 'Request timed out
after 30000ms' }
но иногда это работает, и это дает мне следующий вывод:
...
sent 96 transactions to kafka
data sent: {"test":{"0":15740}}
sent 97 transactions to kafka
data sent: {"test":{"0":15741}}
sent 98 transactions to kafka
data sent: {"test":{"0":15742}}
sent 99 transactions to kafka
data sent: {"test":{"0":15743}}
sent 100 transactions to kafka
Это мой простой потребитель:
kafka = require('kafka-node'),
producer = kafka.Producer,
consumer = kafka.Consumer;
const kclient = new kafka.KafkaClient({
kafkaHost:'10.0.0.55:9092'
// kafkaHost:'35.186.191.135:9092'
});
kclient.on('ready',() => {
console.log(`kclient ready`);
kconsumer = new consumer(kclient,[{
topic:'test',
partition:0
}]);
kconsumer.on('error',(err) => {
console.error(` in kconsumer: \n${err}\n`)
})
kconsumer.on('ready',() => {
console.log(`kconsumer ready`);
kconsumer.on('message',(msg) => {
console.log(`recived msg: ${msg}`);
})
})
})
kclient.on('error',(err) => {
console.error(`err in kclient: \n${err}\n`)
})
Когда я запускаю это на своей машине с Windows, я получаю:
kclient ready
in kconsumer:
TimeoutError: Request timed out after 30000ms
Когда я запускаю потребителя на машине Centos, я не получаю никаких ошибок, просто остановка:
kclient ready
и никогда "kconsumer ready".
Запуск обоих в режиме отладки просто показывает, что:
...
kafka-node:KafkaClient kafka-node-client reconnecting to ADDR:9092 +1s
kafka-node:KafkaClient kafka-node-client createBroker ADDR:9092 +2ms
kafka-node:Consumer connection closed +1s
kafka-node:KafkaClient kafka-node-client socket closed ADDR:9092 (hadError: true) +3ms
...
Когда я проверяю, работают ли kafka и zookeeper на моей машине centos, все работает нормально, япроверил это, используя консоли производителя / потребителя.
Кроме того, я проверил логи zookeeper и kafka, нет ошибок, но нет признаков того, что какой-либо производитель или потребители подключились или отправили какие-либо эфиры сообщений.
Кто-нибудь сталкивался с этой проблемойБиблиотека kafka-node?
Кто-нибудь нашел решение?