Дождитесь заполнения очереди в RabbitMQ с помощью Promise - PullRequest
0 голосов
/ 17 июня 2020

У меня есть фрагмент кода, в котором я использую события из RabbitMQ и сохраняю события (3 типа событий A, B, C) в 2 разных базах данных A, B. Я могу поместить sh в базу данных A без каких-либо проблем, но мне нужно подождать, пока ни одно из событий не будет по крайней мере 100 до sh событий типа B и C в базе данных или пока код не попытается заполнить очередь за последние 5 минут из вызываемой точки saveToDb. Я не могу понять, как дождаться событий для B и C, а затем сохранить данные в базе данных.

Обратите внимание, что событие A будет go в базе данных A и событие B, C будет go в базе данных B.

Я написал следующий фрагмент кода.

import { Channel, ConsumeMessage } from 'amqplib';

const BATCH_SIZE = 100;

var eventBQueue = [];
var eventAQueue = [];

const shiftElements = (message) => {

    if ( message.length >= BATCH_SIZE) {
        const batch= message.splice(0, BATCH_SIZE);
        return batch;
    }
    return message;
}

const saveToDb = async (messages, database) => {
    const eventsA = filterEventsA(messages);
    const eventsB = filterEventsB(messages);
    const eventsC = filterEventsC(eventsB);

    const promises = [];
    promises.push(databaseAsync.publish(eventsC));


    if (eventBQueue.length < BATCH_SIZE) { 
        eventBQueue.push.apply(eventBQueue, eventsB);
    }
    else { 
        var eventsBBatched = shiftElements(eventBQueue);
        promises.push(database.publish(eventsBBatched, EVENTS_TABLE_A));
    }

    if (eventAQueue.length < BATCH_SIZE) {
        eventAQueue.push.apply(eventAQueue, eventsA);
    }
    else {
        var eventsABatched = shiftElements(eventAQueue);
        promises.push(database.publish(eventsABatched, EVENTS_TABLE_B));
    }

    return new Promise((resolve, reject) => {
        Promise.all(promises).then(resolve).catch(reject);
    });
}

export const process = async (database,
    rabbitmq): Promise<void> => {
    return new Promise((resolve, _) => {
        rabbitmq.consume(async (channel, message: ConsumeMessage) => {
            const messages = somefunction(message);
            await saveToDb(messages,database)
                .then(_ => {
                    try {
                        channel.ack(message)
                    } catch (error) {

                    }
                })
                .catch((error) => {
                    try {
                        console.error('error');
                        channel.ack(message)
                    } catch (error) {

                    }
                });
        });
        somefunction(resolve)
    });
}

Теперь я хочу добавить некоторое условие в if, где no of events < Batch_SIZE для ожидания данных от rabbitMQ и сохранения в базе данных, когда eventAQueue и eventBQueue имеют соответствующий размер или есть ограничение по времени ожидания эти данные. Но я не знаю, как это добавить.

...