потребитель и производитель, созданные с помощью kafka-узла, не могут подключиться к рабочему экземпляру kafka - PullRequest
0 голосов
/ 19 июня 2019

Я пытаюсь использовать потребителя и производителя библиотекой npm kafka-node. Чтобы использовать kafka в качестве службы обмена сообщениями через nodejs.

Проблема в том, что, хотя мой производитель иногда работает, потребитель продолжает даватьошибка тайм-аута, или он просто зависает в бесконечном цикле, попробуйте подключиться к kafka, даже taff kafka работает нормально.

Я использую kafka-node на моей машине с Windows, когда Kafka находится на удаленной машине centos7,Странное поведение продолжается, даже если я помещаю весь код (потребитель и производитель) в ту же машину, что и kafka (думая, что mabye windows является частью проблемы).

Я пытался отправлять сообщения во встроенную в консоль производителя kafka, но мой потребитель все еще не появляется, чтобы подписаться на тему и получать сообщения.

Это мой простой код производителя:

const kclient = new kafka.KafkaClient({kafkaHost:'ADDR:9092'});
kclient.on('error',(err) => {
    console.log(err)
})
kprod = new producer (kclient);
// kconsumer = new consumer(kclient);
kprod.on('error',(err) => {
    console.log(`error: ${err}`);
})

kprod.on('ready',() => {
    console.log(`connected to kafka`);
    let tranNumSentToKafka = 0
    for (let index = 0; index < transArray.length; index++) {
        const element = JSON.stringify(transArray[index])
        console.log(`sending data to kafka`);
        kprod.send([{
            topic:'test',
            messages:element
        }],
            (err,data) => { 
            if(err){console.error(err)}
            else{
                tranNumSentToKafka += 1
                console.log(`data sent: ${JSON.stringify(data)}`);
                console.log(`sent ${tranNumSentToKafka} transactions to kafka`);
            }
        })
    }
})

Когда я запускаю это, он иногда дает тайм-аутошибка, например, так:

{ TimeoutError: Request timed out after 30000ms
    at new TimeoutError (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\kafkaClient.js:1007:14)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) message: 'Request timed out
after 30000ms' }

но иногда это работает, и это дает мне следующий вывод:

...
sent 96 transactions to kafka
data sent: {"test":{"0":15740}}
sent 97 transactions to kafka
data sent: {"test":{"0":15741}}
sent 98 transactions to kafka
data sent: {"test":{"0":15742}}
sent 99 transactions to kafka
data sent: {"test":{"0":15743}}
sent 100 transactions to kafka

Это мой простой потребитель:

kafka = require('kafka-node'),
producer = kafka.Producer,
consumer = kafka.Consumer;

const kclient = new kafka.KafkaClient({
    kafkaHost:'10.0.0.55:9092'
    // kafkaHost:'35.186.191.135:9092'
});
kclient.on('ready',() => {
    console.log(`kclient ready`);
    kconsumer = new consumer(kclient,[{
        topic:'test',
        partition:0
    }]);
    kconsumer.on('error',(err) => {
        console.error(` in kconsumer: \n${err}\n`)
    })
    kconsumer.on('ready',() => {
        console.log(`kconsumer ready`);
        kconsumer.on('message',(msg) => {
            console.log(`recived msg: ${msg}`);
        })

    })

})
kclient.on('error',(err) => {
    console.error(`err in kclient: \n${err}\n`)
})

Когда я запускаю это на своей машине с Windows, я получаю:

kclient ready
 in kconsumer:
TimeoutError: Request timed out after 30000ms

Когда я запускаю потребителя на машине Centos, я не получаю никаких ошибок, просто остановка:

kclient ready

и никогда "kconsumer ready".

Запуск обоих в режиме отладки просто показывает, что:

...
  kafka-node:KafkaClient kafka-node-client reconnecting to ADDR:9092 +1s
  kafka-node:KafkaClient kafka-node-client createBroker ADDR:9092 +2ms
  kafka-node:Consumer connection closed +1s
  kafka-node:KafkaClient kafka-node-client socket closed ADDR:9092 (hadError: true) +3ms
...

Когда я проверяю, работают ли kafka и zookeeper на моей машине centos, все работает нормально, япроверил это, используя консоли производителя / потребителя.

Кроме того, я проверил логи zookeeper и kafka, нет ошибок, но нет признаков того, что какой-либо производитель или потребители подключились или отправили какие-либо эфиры сообщений.

Кто-нибудь сталкивался с этой проблемойБиблиотека kafka-node?

Кто-нибудь нашел решение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...