Последовательно выполнять веб-хуки, полученные в приложении узла - PullRequest
1 голос
/ 30 июня 2019

У меня есть приложение узла, использующее koa. Это получение webhooks из внешнего приложения на конкретных ресурсах.

Чтобы проиллюстрировать это, скажем, веб-крючок отправил мне с запросом POST объект этого типа:

{
  'resource_id':'<SomeID>',
  'resource_origin':'<SomeResourceOrigin>',
  'value' : '<SomeValue>'
}

Я хотел бы последовательно выполнять любые ресурсы, поступающие из одного источника, чтобы избежать десинхронизации ресурсов, связанных с моим выполнением.

Я думал использовать базу данных в качестве блокировки и использовать cron для последовательного выполнения моего процесса для каждого ресурса одного и того же происхождения.

Но я не уверен, что это самый эффективный метод.

Так что мой вопрос здесь:

Знаете ли вы какой-нибудь метод / пакет / сервис, позволяющий мне использовать глобальные очереди, которые я мог бы реализовать для каждого источника, чтобы ресурсы одного и того же источника выполнялись синхронно, без последовательной обработки всех веб-крючков? Если он не использует базу данных, лучше.

Ответы [ 2 ]

1 голос
/ 30 июня 2019

На вашем месте я бы начал сериализовать обработку всех ваших веб-хуков.Другими словами, я предлагаю вам обращаться с ними по одному, независимо от их происхождения.Используйте простую очередь внутри вашего приложения nodejs.

(Как только вы убедились, что работает правильно, вы можете затем сериализовать их на основе источника.)

Сначала структурируйте свою функцию (давайте вызовемэто handleOneWebhook()) для обработки входящих веб-заданий как функция Promise или асинхронная функция.Затем вы можете вызвать их, используя код с этим контуром.

let busy= false
async function handleManyWebhooks (queue) {
    if (busy) return
    busy = true
    while (queue.length > 0) {
        const item = queue.shift() 
        await handleOneWebhook (item)
    }
    busy  = false
}

queue, который вы передаете handleManyWebhooks, представляет собой простой массив, где каждый элемент является объектом из запроса POST.Вы используете его как очередь: push() каждый объект для помещения его в очередь и shift() для его удаления.

Затем, всякий раз, когда вы получаете объект POST webhook, вы используете код с этим контуром.

const queue = []
...

function handlePostObject (postObject) {
   queue.push(postObject)
   handleManyWebooks (queue)
}

Даже если вы вызываете handleManyWebhooks один раз для каждого входящего объекта, флаг busy гарантирует, что он обрабатывает только один объект за раз.

Обратите внимание, что это очень простое решение.Как только вы настроите его правильно, появятся два возможных уточнения.

  1. Используйте что-то более эффективное для вашей очереди, чем простой массив.shift() не очень быстрый.

  2. Создайте отдельный объект очереди со своим собственным флагом busy для каждого отдельного источника.После этого вы сможете распараллелить обработку веб-крючков из разных источников, продолжая сериализацию потока веб-крючков из каждого источника.

0 голосов
/ 01 июля 2019

Решение, которое я решил использовать

Небольшое резюме пост-обсуждения

Как Иван Рубинсон дайте мне знать, что моя проблема просто проблема производителя-потребителя .

Поэтому я, наконец, решил использовать RabbitMQ , потому что у меня есть огромное количество веб-крюков для обработки. Для людей, имеющих небольшое количество запросов на обработку и не желающих использовать внешние инструменты O. Ответ Джонса - это действительно хороший способ решить проблему.

Дизайн решения

Наконец, я устанавливаю и настраиваю сервер RabbitMQ, затем для каждого источника моих веб-хуков я создал одну очередь.

Производитель

На стороне производителя, когда я получаю данные веб-перехвата, я отправляю сообщение в очередь, соответствующее источнику моего веб-перехвата, с сериализованной информацией, необходимой для фактической обработки идентификатора строки в базе данных, чтобы сделать сообщения как свет как можно.

Потребитель

На стороне потребителя я создаю функцию потребителя для каждой очереди происхождения и устанавливаю политику выборки равную единице, чтобы обрабатывать сообщения по одному в каждой очереди, наконец, я установил политику канала для ожидания сообщения подтверждения перед отправкой следующего сообщения. В этой конфигурации потребители переходят от сообщения к сообщению и решают начальную проблему.

Осуществление

Производитель

   async function create(){
        await   amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
            await conn.createChannel().then(async (ch)=>{
                global.channel_publisher=ch;
            });
        });
    }

    async function sendtask(queue,task){
        if(!global.channel_publisher){
            await create();
        }
        global.channel_publisher.assertQueue(queue).then((ok)=>{
            global.channel_publisher.sendToQueue(queue, Buffer.from(task));
        });
    }

Я использую функцию sendtask(queue,task) в том месте, где я получил свой веб-хук

Потребительский

   async function create(){
      await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
         await conn.createChannel().then(async (ch)=>{
            ch.prefetch(1);
            global.channel_consumer=ch;
          });
       });
    }

   async function consumeTask(queue){
       if(!global.channel_consumer){
           await create();
       }

       global.channel_consumer.assertQueue(queue).then((ok)=>{
          global.channel_consumer.consume(queue,(message)=>{
               const args=message.content.toString().split(';');

                    await processWebhooks(args);
                    global.channel_consumer.ack(message);
           });
       });
   }

Я использую consumeTask(queue), когда мне нужно было обработать новое происхождение веб-хуков. Также я использую его для инициализации моего приложения со всеми известными источниками в базе данных.

...