У меня есть подписчик в пабе, который вызывает асинхронную функцию для обновления некоторых эластичных поисковых индексов, но поскольку подписчик очень быстро получает несколько запросов, я получаю конфликт версий отasticsearch.
Я попытался добавитьфлаг обработки и проверка флага на интервале, чтобы позволить функции запускаться только по одному за раз.
let unitsBeingProcessed = {};
let maxAttempts = 5;
Pubsub.on("unit_upsert", async(message) => {
try {
let unitId = message.data.unitId;
console.log(unitId + ' being processed is', unitsBeingProcessed[unitId]);
// seems like sometimes 2 requests end up in this block at the same time which shouldn't be possible I think..
if(!unitsBeingProcessed[unitId]){
unitsBeingProcessed[unitId] = true;
await elastic.addUnitToCalendar(message.data);
delete unitsBeingProcessed[unitId];
message.ack();
}
else if(unitsBeingProcessed[unitId]){
let attempts = maxAttempts;
// if unit is being processed already, check every 2 seconds to see if it's done yet
let flagCheck = setInterval(async () => {
attempts--;
if(!unitsBeingProcessed[unitId]){
clearInterval(flagCheck);
unitsBeingProcessed[unitId] = true;
await elastic.addUnitToCalendar(message.data);
delete unitsBeingProcessed[unitId];
message.ack();
}
else {
if(attempts <= 0){
clearInterval(flagCheck);
throw new Error('max tries to process unit exceeded');
}
console.log(unitId + ' is already being processed. Waiting 2s');
}
}, 2000);
}
} catch(err){
if(unitsBeingProcessed[message.data.unitId]){
delete unitsBeingProcessed[message.data.unitId];
}
Logger.error({err}, "Error in unit_availability_upsert_index");
}
})
Я ожидал, чтоastic.addUnitToCalendar () не будет запущен, если он уже был вызван и hasnне законченоВ большинстве случаев проверка интервала работает, но иногда она все еще вызывает функцию, прежде чем она будет выполнена.
Мои журналы показали, что unitBeingProcessed [unitId] не определено два раза подряд.Я ожидал, что он напечатает undefined один раз, а затем true во второй раз, так что это подождет.Есть идеи, что я делаю не так?