Решение, которое я решил использовать
Небольшое резюме пост-обсуждения
Как Иван Рубинсон дайте мне знать, что моя проблема просто проблема производителя-потребителя .
Поэтому я, наконец, решил использовать RabbitMQ , потому что у меня есть огромное количество веб-крюков для обработки. Для людей, имеющих небольшое количество запросов на обработку и не желающих использовать внешние инструменты O. Ответ Джонса - это действительно хороший способ решить проблему.
Дизайн решения
Наконец, я устанавливаю и настраиваю сервер RabbitMQ, затем для каждого источника моих веб-хуков я создал одну очередь.
Производитель
На стороне производителя, когда я получаю данные веб-перехвата, я отправляю сообщение в очередь, соответствующее источнику моего веб-перехвата, с сериализованной информацией, необходимой для фактической обработки идентификатора строки в базе данных, чтобы сделать сообщения как свет как можно.
Потребитель
На стороне потребителя я создаю функцию потребителя для каждой очереди происхождения и устанавливаю политику выборки равную единице, чтобы обрабатывать сообщения по одному в каждой очереди, наконец, я установил политику канала для ожидания сообщения подтверждения перед отправкой следующего сообщения. В этой конфигурации потребители переходят от сообщения к сообщению и решают начальную проблему.
Осуществление
Производитель
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
global.channel_publisher=ch;
});
});
}
async function sendtask(queue,task){
if(!global.channel_publisher){
await create();
}
global.channel_publisher.assertQueue(queue).then((ok)=>{
global.channel_publisher.sendToQueue(queue, Buffer.from(task));
});
}
Я использую функцию sendtask(queue,task)
в том месте, где я получил свой веб-хук
Потребительский
async function create(){
await amqp.connect(RBMQ_CONNECTION_STRING).then(async (conn)=>{
await conn.createChannel().then(async (ch)=>{
ch.prefetch(1);
global.channel_consumer=ch;
});
});
}
async function consumeTask(queue){
if(!global.channel_consumer){
await create();
}
global.channel_consumer.assertQueue(queue).then((ok)=>{
global.channel_consumer.consume(queue,(message)=>{
const args=message.content.toString().split(';');
await processWebhooks(args);
global.channel_consumer.ack(message);
});
});
}
Я использую consumeTask(queue)
, когда мне нужно было обработать новое происхождение веб-хуков. Также я использую его для инициализации моего приложения со всеми известными источниками в базе данных.