Гарантируйте последовательный порядок выполнения произвольного количества обратных вызовов - PullRequest
0 голосов
/ 22 октября 2018

Я подписываюсь на очередь событий и каждый раз, когда я получаю событие, мне нужно сделать асинхронный HTTP-запрос и опубликовать ответ, в том же порядке, в котором я получил событие в другую очередь.Поэтому, в основном, подпишитесь на приложение pub / sub, сделайте некоторое асинхронное вычисление и опубликуйте вычисленный результат без другого приложения pub / sub.Поскольку у меня нет заданного количества обратных вызовов для выполнения, я не могу использовать async.series.

Я думал о том, чтобы создать очередь, которая позволила бы мне вставить сообщение и идентификатор,и будет выдавать событие каждый раз, когда вставленный идентификатор будет равен последнему выданному идентификатору + 1. Затем я буду подписываться на эту очередь и публиковать в своем приложении pub / sub каждый раз, когда получаю событие из своей очереди, поскольку это будет гарантироватьпоследовательный порядок.

То, что мне нужно сделать, кажется очень распространенной задачей, но я не смог найти модуль для этого.Есть ли в NPM что-то, что уже делает это, или есть лучший способ выполнить то, что мне нужно?

1 Ответ

0 голосов
/ 22 октября 2018

Я закончил тем, что создал свой собственный модуль, чтобы делать то, что мне нужно, то есть иметь возможность подписаться на приложение pub / sub и публиковать в другом приложении, выполняя некоторую асинхронную работу между ними, но сохраняя порядок, в котором сообщенияпринимаются подписчиком.

С этим модулем я могу выполнить orderedPubSub.publish(id, message) в обратном вызове асинхронной работы, которую мне нужно выполнить при получении события из первого приложения, и otherApplication.publish(message) в on("message") моего модуля OrderedPubSub.

Хотелось бы, чтобы был другой способ сделать это или модуль уже в NPM.

const EventEmitter = require('events');

class OrderedPubSub extends EventEmitter {

  constructor(initialId = 0) {
    super()
    this.lastPublishedId = initialId
    this.messages = {}
  }

  publish(id, message) {
    this.messages[id] = message
    this.publishAllAvailable()
  }

  publishAllAvailable() {
    let messageId;
    while((messageId = this.lastPublishedId + 1) in this.messages) {
      const message = this.messages[messageId]
      delete this.messages[messageId]
      this.lastPublishedId++
      this.emit("message", message)
    }
  }
}

const orderedPubSub = new OrderedPubSub();

orderedPubSub.on('message', message => {
  console.log(`Received message: "${message}"`)
});

orderedPubSub.publish(3, "third message")
orderedPubSub.publish(2, "second message")
orderedPubSub.publish(4, "fourth message")
orderedPubSub.publish(10, "tenth message")
orderedPubSub.publish(1, "first message")

//outputs
// Received message: "first message"
// Received message: "second message"
// Received message: "third message"
// Received message: "fourth message"
...