Как я могу реализовать nodeJS worker, который передает данные из mon go в elasticsearch? - PullRequest
0 голосов
/ 07 августа 2020

Я создаю приложение на основе CD C, которое использует Mon go Change Streams для прослушивания событий изменений и индексации изменений в elasticsearch почти в реальном time.

Пока что я реализовал воркера, который вызывает функцию для захвата событий, их преобразования и индексации в elasticsearch без проблем при реализации потока для 1 мес. go collection:

function syncChangeEvents() {
  const stream = ModelA.watch()
  while (!stream.isClosed()) {
    if (await stream.hasNext()) {
      const event = stream.next()
      // transform event
      // index to elasticsearch
    }
  }
}

Я реализовал это с использованием бесконечного l oop (вероятно, плохой подход), но я не уверен, какие есть альтернативы, когда мне нужно постоянно поддерживать поток изменений.

Проблема возникает, когда мне нужно реализовать поток изменений для другой модели. Поскольку первая функция имеет блокировку времени l oop, рабочий не может вызвать вторую функцию, чтобы запустить второй поток изменений.

Мне интересно, как лучше всего развернуться рабочий, который может вызвать x нет. потоков изменений, не влияя на производительность каждого потока изменений. Подойдут ли рабочие потоки к go?

1 Ответ

1 голос
/ 07 августа 2020

Существует три основных способа работы с потоками изменений в Node.js.

  1. Вы можете отслеживать поток изменений с помощью функции EventEmitter on () .

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
     // We can use EventEmitter's on() to add a listener function that will be called whenever a change occurs in the change stream.
     // See https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_emitter_on_eventname_listener for the on() docs.
     changeStream.on('change', (next) => {
         console.log(next);
     });
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    
  2. Вы можете отслеживать поток изменений, используя hasNext () .

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // Set a timer that will close the change stream after the given amount of time
     // Function execution will continue because we are not using "await" here
     closeChangeStream(timeInMs, changeStream);
    
     // We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
     // If the change stream is closed, hasNext() will return false so the while loop will exit.
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html for the ChangeStream docs.
     while (await changeStream.hasNext()) {
         console.log(await changeStream.next());
     }
    
  3. Вы можете контролировать Измените поток с помощью Stream API

     // See https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch for the watch() docs
     const changeStream = collection.watch(pipeline);
    
     // See https://mongodb.github.io/node-mongodb-native/3.3/api/ChangeStream.html#pipe for the pipe() docs
     changeStream.pipe(
         new stream.Writable({
             objectMode: true,
             write: function (doc, _, cb) {
                 console.log(doc);
                 cb();
             }
         })
     );
    
     // Wait the given amount of time and then close the change stream
     await closeChangeStream(timeInMs, changeStream);
    

Если ваша база данных MongoDB размещена в Atlas (https://cloud.mongodb.com), самое простое - создать Триггер . Atlas выполняет программирование кода потока изменений за вас, поэтому вам нужно только написать код, который преобразует события и индексирует их в Elasticsearch.

Дополнительная информация о работе с потоками изменений и триггерами доступна в мой блог . Полный пример кода для всех приведенных выше фрагментов доступен на GitHub .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...