Как перезагрузить / повторно инициализировать Kafka Consumer с добавлением нового списка тем в БД - PullRequest
0 голосов
/ 27 июня 2018

У меня есть этот код, который загружается, когда запускается мой узел, я просто изменил этот код с использования статического списка тем для Kafka Consumer, и теперь он использует массив тем, которые он читает из базы данных. Чего я не знаю, так это как инициализировать Kafka Consumer для перезагрузки списка тем из базы данных, когда пользователь добавляет один из них, используя интерфейс реакции. Я могу сделать вызов API от внешнего интерфейса до конечного, но как только я получаю задний план, я не знаю, к чему обратиться потребителю Kafka, чтобы перечитать список из БД и начать использовать этот список тем, которые могут были изменены пользователем при добавлении или удалении.

Этот код использует пакет node-rdkafka. Я видел функцию consumer.disconnect, но не был уверен, что это лучший подход, или я могу просто перезагрузить свою функцию инициализации ??

async init(success, error, services) {

    const client = new pg.Client(config.connectionString);
    var topicsList;

    await consumer.connect(); 
    await client.connect()
    await client.query('SELECT topic FROM public.job;').then(topics => {
        topicsList = Array.from(Object.keys(topics.rows), k=>topics.rows[k].topic)
    });
    await client.end(); 

   consumer.on('ready', function() {
      //consumer.subscribe(['some_Data', 'other_Data', 'and_more_Data', 'deez_Data', 'etc_Data', 'hooha_Data', 'mine_Data']);
      consumer.subscribe(topicsList);
      consumer.consume();
    })
    consumer.on('data', function(data) {
        const { container } = services;

        success(data, services, 7)  //write data to DB

    });

  }
...