Node.js: как предотвратить одновременный вызов асинхронной функции - PullRequest
0 голосов
/ 31 января 2019

У меня есть подписчик в пабе, который вызывает асинхронную функцию для обновления некоторых эластичных поисковых индексов, но поскольку подписчик очень быстро получает несколько запросов, я получаю конфликт версий отasticsearch.

Я попытался добавитьфлаг обработки и проверка флага на интервале, чтобы позволить функции запускаться только по одному за раз.

let unitsBeingProcessed = {};
let maxAttempts = 5;

Pubsub.on("unit_upsert", async(message) => {
try {
    let unitId = message.data.unitId;
    console.log(unitId + ' being processed is', unitsBeingProcessed[unitId]);

    // seems like sometimes 2 requests end up in this block at the same time which shouldn't be possible I think..
    if(!unitsBeingProcessed[unitId]){
        unitsBeingProcessed[unitId] = true;
        await elastic.addUnitToCalendar(message.data);
        delete unitsBeingProcessed[unitId];
        message.ack();
    }
    else if(unitsBeingProcessed[unitId]){
        let attempts = maxAttempts;
        // if unit is being processed already, check every 2 seconds to see if it's done yet
        let flagCheck = setInterval(async () => {
            attempts--;
            if(!unitsBeingProcessed[unitId]){
                clearInterval(flagCheck);
                unitsBeingProcessed[unitId] = true;
                await elastic.addUnitToCalendar(message.data);
                delete unitsBeingProcessed[unitId];
                message.ack();
            }
            else {
                if(attempts <= 0){
                    clearInterval(flagCheck);
                    throw new Error('max tries to process unit exceeded');
                }
                console.log(unitId + ' is already being processed. Waiting 2s');
            }
        }, 2000);
    }
} catch(err){
    if(unitsBeingProcessed[message.data.unitId]){
        delete unitsBeingProcessed[message.data.unitId];
    }
    Logger.error({err}, "Error in unit_availability_upsert_index");
}
})

Я ожидал, чтоastic.addUnitToCalendar () не будет запущен, если он уже был вызван и hasnне законченоВ большинстве случаев проверка интервала работает, но иногда она все еще вызывает функцию, прежде чем она будет выполнена.

Мои журналы показали, что unitBeingProcessed [unitId] не определено два раза подряд.Я ожидал, что он напечатает undefined один раз, а затем true во второй раз, так что это подождет.Есть идеи, что я делаю не так?

1 Ответ

0 голосов
/ 31 января 2019

Это распространенная проблема многопоточности.Обычно вы бы использовали что-то под названием Mutex.Мьютекс подобен замку, который гарантирует, что только один человек может иметь блокировку в данный момент времени.По умолчанию в node.js нет ни одного, но, похоже, некоторые люди написали https://www.npmjs.com/package/async-mutex.. Попробуйте вместо флага интервала сделать это, это должно упростить задачу и решить вашу проблему.

...