У меня есть этот код, который загружается, когда запускается мой узел, я просто изменил этот код с использования статического списка тем для Kafka Consumer, и теперь он использует массив тем, которые он читает из базы данных. Чего я не знаю, так это как инициализировать Kafka Consumer для перезагрузки списка тем из базы данных, когда пользователь добавляет один из них, используя интерфейс реакции. Я могу сделать вызов API от внешнего интерфейса до конечного, но как только я получаю задний план, я не знаю, к чему обратиться потребителю Kafka, чтобы перечитать список из БД и начать использовать этот список тем, которые могут были изменены пользователем при добавлении или удалении.
Этот код использует пакет node-rdkafka. Я видел функцию consumer.disconnect, но не был уверен, что это лучший подход, или я могу просто перезагрузить свою функцию инициализации ??
async init(success, error, services) {
const client = new pg.Client(config.connectionString);
var topicsList;
await consumer.connect();
await client.connect()
await client.query('SELECT topic FROM public.job;').then(topics => {
topicsList = Array.from(Object.keys(topics.rows), k=>topics.rows[k].topic)
});
await client.end();
consumer.on('ready', function() {
//consumer.subscribe(['some_Data', 'other_Data', 'and_more_Data', 'deez_Data', 'etc_Data', 'hooha_Data', 'mine_Data']);
consumer.subscribe(topicsList);
consumer.consume();
})
consumer.on('data', function(data) {
const { container } = services;
success(data, services, 7) //write data to DB
});
}