Я использую kafka для сборки двух потребительских клиентов, как показано в коде ниже, но он дал мне ошибку, сказав
TopicsNotExistError: The topic(s) DEVICE_BINDED do not exist
at new TopicsNotExistError (/Users/xisizhe/Documents/projects/mean-master/node_modules/_kafka-node@3.0.0@kafka-node/lib/errors/TopicsNotExistError.js:11:9)
at /Users/xisizhe/Documents/projects/mean-master/node_modules/_kafka-node@3.0.0@kafka-node/lib/client.js:464:43
Как я могу это решить?
var kafka = require('kafka-node');
function ConnectionProvider() {
this.getConsumer = function (topic_name) {
console.log('consumer consumer',topic_name);
this.client = new kafka.Client("localhost:2181");
this.kafkaConsumerConnection = new kafka.Consumer(this.client,[ { topic: topic_name, partition: 0 }]);
this.kafkaConsumerConnection.on('ready', function () {
console.log('consumer ready!')
})
return this.kafkaConsumerConnection;
};
//Code will be executed when we start Producer
this.getProducer = function () {
if (!this.kafkaProducerConnection) {
this.client = new kafka.Client("localhost:2181");
var HighLevelProducer = kafka.HighLevelProducer;
//this.kafkaProducerConnection = new HighLevelProducer(this.client);
this.kafkaProducerConnection = new kafka.Producer(this.client);
var self = this;
this.kafkaProducerConnection.on('ready', function () {
logger.debug('producer ready');
self.kafkaProducerConnection.on('error', function (err) {
console.error("Error " + err)
})
});
}
return this.kafkaProducerConnection;
};
}
const device_add_consumer = connection.getConsumer(config.kafka.topic.DEVICE_ADDED);
const device_binded_consumer = connection.getConsumer(config.kafka.topic.DEVICE_BINDED);