У меня есть фрагмент кода, в котором я использую события из RabbitMQ и сохраняю события (3 типа событий A, B, C) в 2 разных базах данных A, B. Я могу поместить sh в базу данных A без каких-либо проблем, но мне нужно подождать, пока ни одно из событий не будет по крайней мере 100 до sh событий типа B и C в базе данных или пока код не попытается заполнить очередь за последние 5 минут из вызываемой точки saveToDb. Я не могу понять, как дождаться событий для B и C, а затем сохранить данные в базе данных.
Обратите внимание, что событие A будет go в базе данных A и событие B, C будет go в базе данных B.
Я написал следующий фрагмент кода.
import { Channel, ConsumeMessage } from 'amqplib';
const BATCH_SIZE = 100;
var eventBQueue = [];
var eventAQueue = [];
const shiftElements = (message) => {
if ( message.length >= BATCH_SIZE) {
const batch= message.splice(0, BATCH_SIZE);
return batch;
}
return message;
}
const saveToDb = async (messages, database) => {
const eventsA = filterEventsA(messages);
const eventsB = filterEventsB(messages);
const eventsC = filterEventsC(eventsB);
const promises = [];
promises.push(databaseAsync.publish(eventsC));
if (eventBQueue.length < BATCH_SIZE) {
eventBQueue.push.apply(eventBQueue, eventsB);
}
else {
var eventsBBatched = shiftElements(eventBQueue);
promises.push(database.publish(eventsBBatched, EVENTS_TABLE_A));
}
if (eventAQueue.length < BATCH_SIZE) {
eventAQueue.push.apply(eventAQueue, eventsA);
}
else {
var eventsABatched = shiftElements(eventAQueue);
promises.push(database.publish(eventsABatched, EVENTS_TABLE_B));
}
return new Promise((resolve, reject) => {
Promise.all(promises).then(resolve).catch(reject);
});
}
export const process = async (database,
rabbitmq): Promise<void> => {
return new Promise((resolve, _) => {
rabbitmq.consume(async (channel, message: ConsumeMessage) => {
const messages = somefunction(message);
await saveToDb(messages,database)
.then(_ => {
try {
channel.ack(message)
} catch (error) {
}
})
.catch((error) => {
try {
console.error('error');
channel.ack(message)
} catch (error) {
}
});
});
somefunction(resolve)
});
}
Теперь я хочу добавить некоторое условие в if
, где no of events < Batch_SIZE
для ожидания данных от rabbitMQ и сохранения в базе данных, когда eventAQueue и eventBQueue имеют соответствующий размер или есть ограничение по времени ожидания эти данные. Но я не знаю, как это добавить.