Я пытаюсь получить метаданные тем моего брокера kafka с помощью клиента администрирования kafka js. Я написал свой сервер в Node.js + express. js.
Это мой index.js
файл, который является точкой входа npm.
'use strict';
const express = require('express');
const bodyParser = require('body-parser');
const Admin = require('./Admin/create-admin-client');
const TopicMetaData = require('./Admin/topic-metadata-fetch');
const app = express();
app.use(bodyParser.urlencoded({ extended: false }));
app.use(bodyParser.json());
const adminConfig = new Admin({
clientId: 'admin-client-4981',
brokers: ['localhost:9092']
})
const admin = adminConfig.getAdmin();
.
..
...
// Handles the admin connection, disconnection, and other routes here
...
..
.
//This is where the error is
app.post('/api/v1/dev/admin/topicmetadata', (req, res) => {
const topic = req.body;
const topicmetadata = new TopicMetaData(admin);
topicmetadata.setTopicConfig(topic);
topicmetadata.commit(req, res);
});
app.listen(4040);
Это create-admin-client.js
файл, который получает объект администратора.
'use strict';
const { Kafka } = require('kafkajs');
class Admin {
constructor(kafkaConfig) {
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
}
getAdmin() {
return this.kafka.admin();
};
}
module.exports = Admin;
Это файл topics-metadata-fetch.js
, который выбирает метаданные topi c.
'use strict';
class TopicMetaData {
constructor(admin) {
this.admin = admin;
}
setTopicConfig(topicConfig) {
this.topic = topicConfig;
}
commit(req, res) {
this.admin.fetchTopicMetadata({
topics: this.topic
})
.then((topics) => {
console.log("Topic MetaData Fetched Successfully!");
res.status(200).send({
topics
});
})
.catch((err) => {
console.error(err);
res.status(500).send(err);
})
}
}
module.exports = TopicMetaData;
Всякий раз, когда я отправляю запрос POST для получения метаданных topi c (например, «СЕРВИСНЫЕ ТИПЫ», я успешно создал topi c) с req.body
как
{
"topic": "SERVICE-TYPES",
"partitions": [{
"partitionErrorCode": 0,
"partitionId": 0,
"leader": 0,
"replicas": [0],
"isr": [0]
}]
}
Возвращает ошибку TypeError: topics.forEach is not a function
. Где я go ошибся?