Предотвратить параллельную обработку в NodeJS - PullRequest
0 голосов
/ 07 октября 2018

Мне нужен NodeJS для предотвращения одновременных операций для одних и тех же запросов.Из того, что я понимаю, если NodeJS получает несколько запросов, вот что происходит:

REQUEST1 ---> DATABASE_READ
REQUEST2 ---> DATABASE_READ
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST1_END
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST2_END

Это приводит к выполнению двух дорогостоящих операций.Мне нужно что-то вроде этого:

REQUEST1 ---> DATABASE_READ
DATABASE_READ complete ---> DATABASE_UPDATE
DATABASE_UPDATE complete ---> REQUEST2 ---> DATABASE_READ ––> REQUEST2_END
                         ---> EXPENSIVE_OP() --> REQUEST1_END

Вот как это выглядит в коде.Проблема заключается в том, что окно между тем, когда приложение начинает читать значение кэша, и когда оно заканчивает запись в него.В этом окне параллельные запросы не знают, что уже запущен один запрос с тем же itemID.

app.post("/api", async function(req, res) {
    const itemID = req.body.itemID

    // See if itemID is processing
    const processing = await DATABASE_READ(itemID)
    // Due to how NodeJS works, 
    // from this point in time all requests
    // to /api?itemID="xxx" will have processing = false 
    // and will conduct expensive operations

    if (processing == true) {
        // "Cheap" part
        // Tell client to wait until itemID is processed
    } else {
        // "Expensive" part
        DATABASE_UPDATE({[itemID]: true})
        // All requests to /api at this point
        // are still going here and conducting 
        // duplicate operations.
        // Only after DATABASE_UPDATE finishes, 
        // all requests go to the "Cheap" part
        DO_EXPENSIVE_THINGS();
    }
}

Редактировать

Конечно, я могу сделать что-то вроде этого:

const lockedIDs = {}
app.post("/api", function(req, res) {
    const itemID = req.body.itemID
    const locked = lockedIDs[itemID] ? true : false // sync equivalent to async DATABASE_READ(itemID)
    if (locked) {
        // Tell client to wait until itemID is processed
        // No need to do expensive operations
    } else {
        lockedIDs[itemID] = true // sync equivalent to async DATABASE_UPDATE({[itemID]: true})
        // Do expensive operations
        // itemID is now "locked", so subsequent request will not go here
    }
}

lockedIDs здесь ведет себя как оперативная синхронная база данных значений ключей.Это нормально, если это всего лишь один сервер.Но что, если есть несколько экземпляров сервера?Мне нужно отдельное хранилище кеша, как у Redis.И я могу получить доступ к Redis только асинхронно .Так что, к сожалению, это не сработает.

Ответы [ 2 ]

0 голосов
/ 07 октября 2018

Ладно, позвольте мне разобраться с этим.

Итак, проблема, с которой я столкнулся в этом вопросе, состоит в том, что вы настолько абстрагировали проблему, что вам действительно трудно помочь оптимизировать ее.Непонятно, что делает ваш «длительный процесс», и то, что он делает, повлияет на решение проблемы обработки нескольких одновременных запросов.Что делает ваш API, что вы беспокоитесь о потреблении ресурсов?

Из вашего кода сначала я догадался, что вы запускаете какое-то длительное задание (например, преобразование файлов или что-то в этом роде), но затем некоторые изменения и комментарии заставляют меня думать, что это может бытьпросто сложный запрос к базе данных, который требует много вычислений, чтобы правильно и, следовательно, вы хотите кэшировать результаты запроса.Но я также мог видеть, что это что-то другое, например, запрос к группе сторонних API, которые вы собираете, или что-то в этом роде.У каждого сценария есть свой нюанс, который меняет то, что является оптимальным.

Тем не менее, я объясню сценарий «кэширования», и вы можете сказать мне, если вас больше интересует одно из других решений.

По сути, вы уже находитесь в правильном поле для кэша.Если вы еще этого не сделали, я бы порекомендовал взглянуть на cache-manager , который немного упрощает ваш шаблон для этих сценариев (и давайте вам установим недействительность кэша и даже обеспечим многоуровневое кэширование).Часть, которую вы упускаете, заключается в том, что вы, по сути, должны всегда отвечать всем, что у вас есть в кеше, и заполнять кеш вне области любого данного запроса.Используя ваш код в качестве отправной точки, что-то вроде этого (исключая все попытки .. ловушки и проверки ошибок и так далее для простоты):

// A GET is OK here, because no matter what we're firing back a response quickly, 
//      and semantically this is a query
app.get("/api", async function(req, res) {
    const itemID = req.query.itemID

    // In this case, I'm assuming you have a cache object that basically gets whatever
    //    is cached in your cache storage and can set new things there too.  
    let item = await cache.get(itemID)

    // Item isn't in the cache at all, so this is the very first attempt.  
    if (!item) {
        // go ahead and let the client know we'll get to it later. 202 Accepted should 
        //   be fine, but pick your own status code to let them know it's in process. 
        //   Other good options include [503 Service Unavailable with a retry-after 
        //   header][2] and [420 Enhance Your Calm][2] (non-standard, but funny)
        res.status(202).send({ id: itemID });

        // put an empty object in there so we know it's working on it. 
        await cache.set(itemID, {}); 

        // start the long-running process, which should update the cache when it's done
        await populateCache(itemID); 
        return;
    }
    // Here we have an item in the cache, but it's not done processing.  Maybe you 
    //     could just check to see if it's an empty object or not, but I'm assuming 
    //     that we've setup a boolean flag on the cached object for when it's done.
    if (!item.processed) {
        // The client should try again later like above.  Exit early. You could 
        //    alternatively send the partial item, an empty object, or a message. 
       return res.status(202).send({ id: itemID });
    } 

    // if we get here, the item is in the cache and done processing. 
    return res.send(item);
}

Теперь я не знаю точно, что все ваши вещиделает, но если это я, populateCache сверху - довольно простая функция, которая просто вызывает любой сервис, который мы используем для выполнения длительной работы, а затем помещает его в кеш.

async function populateCache(itemId) {
   const item = await service.createThisWorkOfArt(itemId);
   await cache.set(itemId, item); 
   return; 
}

Дайте мне знать, если это не ясно или ваш сценарий действительно отличается от того, что я предполагаю.

Как уже упоминалось в комментариях, этот подход будет охватывать большинство обычных проблем, которые могут возникнуть у вас в описанном сценарии, но он все равно позволит двум запросам отключить длительный процесс, если они поступят быстрее, чемзапись в кэш-хранилище (например, Redis).Я считаю, что вероятность того, что это произойдет, довольно мала, но если вы действительно обеспокоены этим, то следующей более параноидальной версией этого будет простое удаление кода продолжительного процесса из вашего веб-API в целом.Вместо этого ваш API просто записывает, что кто-то просил, чтобы такое произошло, и если в кеше ничего нет, то отвечайте, как я делал выше, но полностью удалите блок, который фактически вызывает populateCache в целом.

Вместо этого у вас должен быть запущен отдельный рабочий процесс, который периодически (как часто зависит от вашего бизнес-сценария) проверяет кэш на необработанные задания и запускает работу для их обработки.Делая это таким образом, даже если у вас есть 1000 одновременных запросов на один и тот же элемент, вы можете гарантировать, что обрабатываете его только один раз.Недостатком, конечно, является то, что вы добавляете периодичность проверки к задержке получения полностью обработанных данных.

0 голосов
/ 07 октября 2018

Вы можете создать локальный объект Map (в памяти для синхронного доступа), который содержит любой itemID в качестве ключа, который обрабатывается.Вы могли бы сделать значение для этого ключа обещанием, которое разрешается независимо от результата, полученного от любого, кто ранее обработал этот ключ.Я думаю об этом как о привратнике.Он отслеживает, какие itemIDs обрабатываются.

Эта схема сообщает будущим запросам на тот же itemID ждать и не блокирует другие запросы - я подумал, что это было важно, а не просто использование глобальной блокировки для всех связанных запросовк обработке itemID.

Затем, как часть вашей обработки, вы сначала проверяете локальный объект Map.Если этот ключ находится там, то он в настоящее время обрабатывается.Затем вы можете просто дождаться обещания от объекта Map, чтобы увидеть, когда он будет обработан, и получить какой-либо результат от предыдущей обработки.

Если его нет в объекте Map, то он не обрабатывается сейчас, и вы можете сразупоместите это в Карту, чтобы отметить это как "в процессе".Если вы установите обещание в качестве значения, то вы можете разрешить это обещание с любым результатом, который вы получите от этой обработки объекта.

Любые другие поступающие запросы в конечном итоге просто будут ждать этого обещания, и вы будететаким образом, обрабатывать этот идентификатор только один раз.Первый, который будет начинаться с этого идентификатора, будет обрабатывать его, а все остальные запросы, поступающие во время его обработки, будут использовать тот же общий результат (таким образом сохраняя дублирование ваших тяжелых вычислений).

Я пытался кодироватьпример, но не совсем понял, что ваш псевдо-код пытался сделать достаточно хорошо, чтобы предложить пример кода.

Системы, подобные этой, должны иметь идеальную обработку ошибок, чтобы все возможные пути ошибок обрабатывали Mapи обещание встроено в Map правильно.

На основе вашего довольно легкого примера псевдокода приведен аналогичный пример псевдокода, который иллюстрирует приведенную выше концепцию:

const itemInProcessCache = new Map();

app.get("/api", async function(req, res) {
    const itemID = req.query.itemID
    let gate = itemInProcessCache.get(itemID);
    if (gate) {
        gate.then(val => {
            // use cached result here from previous processing
        }).catch(err => {
            // decide what to do when previous processing had an error
        });
    } else {
        let p = DATABASE_UPDATE({itemID: true}).then(result => {
            // expensive processing done
            // return final value so any others waiting on the gate can just use that value
            // decide if you want to clear this item from itemInProcessCache or not
        }).catch(err => {
            // error on expensive processing

            // remove from the gate cache because we didn't get a result
            // expensive processing will have to be done by someone else
            itemInProcessCache.delete(itemID);
        });
        // mark this item as being processed
        itemInProcessCache.set(itemID, p);
    }
});

Примечание. Этоопирается на однопоточность node.js.Никакой другой запрос не может быть запущен до тех пор, пока не вернется обработчик запросов, так что itemInProcessCache.set(itemID, p); будет вызван до того, как начнутся любые другие запросы для этого itemID.


Кроме того, я не очень хорошо знаю базы данных,но это очень похоже на функцию, которая может быть встроена в хорошую многопользовательскую базу данных или иметь вспомогательные функции, которые облегчают эту задачу, поскольку нередко возникает идея не хотеть, чтобы несколько запросов пытались выполнить одну и ту же работу базы данных (илиеще хуже, мучая друг друга работой).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...