Mongo Change Streams, запущенные несколько раз (вид): приложение Node, работающее несколько экземпляров - PullRequest
0 голосов
/ 24 августа 2018

Приложение My Node использует потоки изменений Mongo, и приложение запускает более 3 экземпляров в рабочем состоянии (в конечном итоге, так как это будет увеличиваться по мере роста).Итак, когда изменение происходит в потоке изменений, функциональность запускается столько раз, сколько существует процессов.

Как настроить все так, чтобы поток изменений запускался только один раз?

Вот что яВы получили:

const options = { fullDocument: "updateLookup" };

const filter = [
  {
    $match: {
      $and: [
        { "updateDescription.updatedFields.sites": { $exists: true } },
        { operationType: "update" }
      ]
    }
  }
];

const sitesStream = Client.watch(sitesFilter, options);

// Start listening to site stream
sitesStream.on("change", async change => {
  console.log("in site change stream", change);
  console.log(
    "in site change stream, update desc",
    change.updateDescription
  );

  // Do work...
  console.log("site change stream done.");
  return;
});

Ответы [ 2 ]

0 голосов
/ 30 августа 2018

Хотя вариант Kafka звучит интересно, было много работы по инфраструктуре на платформе, с которой я не знаком, поэтому я решил пойти с чем-то немного ближе к дому для меня, отправив сообщение MQTT на небольшой стенд приложение, позволяющее серверу MQTT отслеживать сообщения на предмет уникальности.

siteStream.on("change", async change => {
  console.log("in site change stream);
  const mqttClient = mqtt.connect("mqtt://localhost:1883");
  const id = JSON.stringify(change._id._data);
  // You'll want to push more than just the change stream id obviously...
  mqttClient.on("connect", function() {
    mqttClient.publish("myTopic", id);
    mqttClient.end();
  });
});

Я все еще работаю над окончательной версией сервера MQTT, но метод оценки уникальности сообщений, вероятно, будет хранить массив идентификаторов потоков изменений в памяти приложения, поскольку нет необходимости сохранять их и оценивать, продолжить работу в зависимости от того, был ли ранее виден этот идентификатор потока изменений.

var mqtt = require("mqtt");
var client = mqtt.connect("mqtt://localhost:1883");
var seen = [];
client.on("connect", function() {
  client.subscribe("myTopic");
});
client.on("message", function(topic, message) {
  context = message.toString().replace(/"/g, "");
  if (seen.indexOf(context) < 0) {
    seen.push(context);
    // Do stuff
  }
});

Это не относится к безопасности и т. Д., Но вы поняли.

0 голосов
/ 24 августа 2018

Похоже, вам нужен способ разделения обновлений между экземплярами.Вы смотрели в Apache Kafka?По сути, вы должны иметь одно приложение, которое записывает данные изменений в раздел Kafka Topic, и ваше приложение-узел будет потребителем Kafka.Это гарантирует, что только один экземпляр приложения когда-либо получит обновление.

В зависимости от вашей стратегии секционирования, вы можете даже гарантировать, что обновления для одной и той же записи всегда будут поступать в одно и то же приложение узла (если вашему приложению необходимо поддерживать свое собственноегосударство).В противном случае вы можете распространять обновления в циклическом режиме.

Самым большим преимуществом использования Kafka является то, что вы можете добавлять и удалять экземпляры без необходимости настройки конфигураций.Например, вы можете запустить один экземпляр, и он будет обрабатывать все обновления.Затем, как только вы запускаете другой экземпляр, каждый из них начинает обрабатывать половину нагрузки.Вы можете продолжить этот шаблон для любого количества экземпляров (и вы можете настроить раздел так, чтобы иметь тысячи разделов, если хотите), что является мощью группы потребителей Kafka.Сокращение работает в обратном порядке.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...