Я хочу сделать SELECT .. FOR UPDATE
, чтобы заблокировать строку в таблице, чтобы она могла быть обновлена атомарным способом, так как мог произойти одновременный запрос того же типа.
Я проводил некоторое тестированиено не ясно, FOR UPDATE
работает в pg-promise.task
.Я стараюсь избегать использования pg-promise.tx
, так как это потребует больше логики и, возможно, рекурсии, и того, и другого я хочу избежать, поскольку сценарий использования будет иметь высокую пропускную способность.
Обновление: После дополнительных исследований и испытаний я обнаружил, что использование task
с SELECT .. FOR UPDATE
дает мне неожиданный результат.Объяснение кода ниже.
submitUserOnline:(pgdb, u_uuid, socketConnectionList, user_websock) =>{
return new Promise((resolve,reject) =>{
// pgdb.tx({mode} t => {
pgdb.task( t => {
return t.one('SELECT * FROM users WHERE user_uuid = $1', [u_uuid]
.then(user =>{
// nothing happens here right now, but may in future, including for completeness. May cancel request based off some comparisons.
return t.any('SELECT * FROM users_online WHERE user_uuid = $1 FOR UPDATE',[u_uuid]
.then(result =>{
if(result.length > 0){
console.error('duplicate connection found ' + user_websock.uuid);
if(socketConnectionList[result[0].web_sock_uuid] !== undefined){
console.error('drop connection' + result[0].web_sock_uuid);
socketConnectionList[result[0].web_sock_uuid].wsc.close(4020, 'USER_RECONN');
}
console.error('duplicate connection found - update next ' + user_websock.uuid);
return t.none('UPDATE users_online SET web_sock_uuid = $1 WHERE user_uuid = $2', [user_websock, u_uuid])
.then(res =>{
console.error('UPDATE res: ' + res);
})
.catch(err =>{
console.error('UPDATE err: ' + err);});
}else{
// not reached in test case
console.error('no duplicate found ' + user_websock.uuid);
return t.none(INSERT INTO users_online.....ect ect
}
});
})
.then(res =>{
console.error('>>>task/tx res: ' + res);
resolve({msg: "OK"});
})
.catch(err =>{
console.error('>>>task/tx err: ' + err);
if(err.code ==== '40001'){// recursion for when called as 'tx'
console.error('>>>task/tx err - call recurse');
module.exports.submitUserOnline(pgdb, u_uuid, socketConnectionList, user_websock)
.then(res =>{
console.error('>>>task/tx err - call recurse - res ' + res);
resolve({msg: "OK"});
})
.catch( err =>{
console.error('>>>task/tx err - call recurse - err: ' + err);
reject({msg:"FAILED"});
});
}
});
});
}
const mode = new TransactionMode({
tiLevel: isolationLevel.serializable,
readOnly: false,
deferrable: true
});
submitUserOnline
вызывается обработчиком веб-сокета.В моем тестовом примере у меня есть массив из 10 элементов (тот же user_uuid), который запускает все клиентские соединения в цикле for.В основном это подключение к веб-сокету сервера, этот сокет проверяет таблицу users_online
для конкретного пользователя, если этот пользователь уже находится в таблице, уничтожает устаревшее соединение и обновляет web_sock_uuid
в таблице, это гдевозникают проблемы (иногда это работает, иногда нет, проблема возникает при запуске 10 одновременных подключений).Когда пользовательская строка web_sock_uuid
имеет значение UPDATE
', другие параллельные соединения, по-видимому, правильно блокируются на SELECT .. FOR UPDATE(SLFU)
, когда выполняется UPDATE
, then()
не всегда запускается до выпуска следующего SLFU
.Кажется, это представляет себя в виде ожидающего SLFU
возврата старого web_sock_uuid
строки перед предшествующим UPDATE
.В одном случае один и тот же устаревший web_sock_uuid возвращался 4 раза подряд.
Если я переключаюсь с метода task
на метод tx
, который требует рекурсивного вызова, приведенный выше код работает какожидается, хотя это требует много повторений.