Как правильно использовать команду GET SET для redis в nodejs под параллелизмом? - PullRequest
0 голосов
/ 31 мая 2018

В моем параллельном проекте мне нужно получить значение в Redis, затем обновить его и установить в Redis.Как и в следующем коде, ожидаемый результат должен быть 3000, но я не могу получить правильный результат.Последовательность может быть неправильной, возможно, получить правильную последовательность и правильный результат?Стоит ли использовать какой-нибудь замок?

import * as redis from 'redis';

let config: redis.ClientOpts = {
    host: '127.0.0.1',
    port: 6379
};

let redisClient: redis.RedisClient = new redis.RedisClient(config);
redisClient.set('num', '0');

(async () => {
    for (let i = 0; i < 1000; i++) {
        await add ();
    }
})();

(async () => {
    for (let i = 0; i < 1000; i++) {
        await add ();
    }
})();

(async () => {
    for (let i = 0; i < 1000; i++) {
        await add ();
    }
})();

// I know incr command, this is just an example.
async function add () { 
    let numStr: string = await get('num');
    let num: number = Number(numStr);
    num++;
    await set('num', String(num));
    console.log(num);
}

async function get (key: string): Promise<string> {
    return new Promise<string>((resovle, reject) => {
        redisClient.get(key, (err: Error, reply: string) => {
            if (err) {
                console.error(err);
                reject(err);
            }
            resovle(reply);
        })
    });
}

async function set (key: string, value: string): Promise<string> {  
    return new Promise<string>((resovle, reject) => {
        redisClient.set(key, value, (err: Error, reply: string) => {
            if (err) {
                console.error(err);
                reject(err);
            }
            resovle(reply);
        })
    });
}

Ответы [ 4 ]

0 голосов
/ 01 июня 2018

Для решения вашей проблемы вам нужно выполнить все действия (получить, увеличить, установить), выполняемые функцией add () внутри транзакции, для достижения атомарности - это можно сделать в Redis следующими способами:

  • Транзакции Redis (MULTI / EXEC / WATCH) с оптимистической блокировкой (повторные попытки)
  • Скрипт Redis Lua: создайте скрипт lua, который будет выполнять функцию добавления, и выполните ее
0 голосов
/ 01 июня 2018

Вы столкнулись с этой проблемой, потому что не ожидаете завершения каждой функции IFFE.Следовательно, они работают параллельно и не дадут вам результат, который вы ищете.

Чтобы решить эту проблему, вам нужно await для каждой функции IFFE.Теперь мы можем await только если функция в async.Итак, я преобразовал функцию IFFE в обычную функцию.Вот реализация

async function run() {
  for (let i = 0; i < 1000; i++) {
    await add();
  }
}

// I know incr command, this is just an example.
async function add() {
  let numStr = await get('num');
  let num = Number(numStr);
  num++;
  await set('num', String(num));
  console.log(num);
}

function get(key) {
  return new Promise((resovle, reject) => {
    redisClient.get(key, (err, reply) => {
      if (err) {
        console.error(err);
        reject(err);
      }
      resovle(reply);
    });
  });
}

async function set(key, value) {
  return new Promise((resovle, reject) => {
    redisClient.set(key, value, (err, reply) => {
      if (err) {
        console.error(err);
        reject(err);
      }
      resovle(reply);
    })
  });
}

async function runner() {
  await run();
  await run();
  await run();

}
runner();
0 голосов
/ 01 июня 2018

Я решил, спасибо всем.Вот мой кодref: https://redis.io/topics/distlock

import * as redis from 'redis';
import * as crypto from 'crypto';

const lockScript: string = 'return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])';
const unlockScript: string = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';

let config: redis.ClientOpts = {
    host: '127.0.0.1',
    port: 6379
};

let redisClient: redis.RedisClient = new redis.RedisClient(config);
redisClient.set('num', '0');

(async () => {
    for (let i = 0; i < 1000; i++) {
        await add ();
    }
})();

(async () => {
    for (let i = 0; i < 1000; i++) {
        await add ();
    }
})();

(async () => {
    for (let i = 0; i < 1000; i++) {
        add ();
    }
})();

async function add () {
    let resourse: string = 'lock:num';
    let uniqueStr: string = crypto.randomBytes(10).toString('hex');
    await attemptLock(resourse, uniqueStr);
    let num: number = Number(await get('num'));
    num++;
    console.log(num);
    await set('num', String(num));
    await unlock(resourse, uniqueStr);
}

async function loop (resourse: string, uniqueStr: string, ttl?: string) {
    return new Promise<any>((resolve, reject) => {
        setTimeout(async () => {
            let result: any = await lock(resourse, uniqueStr, ttl);
            resolve(!!result);
        }, 10);
    });
}

async function attemptLock (resourse: string, uniqueStr: string, ttl?: string) {
    let result = await loop(resourse, uniqueStr, ttl);
    if (result) {
        return true;
    } else {
        return attemptLock(resourse, uniqueStr, ttl);
    }
}

async function lock (resourse: string, uniqueStr: string, ttl?: string) {
    ttl = ttl ? ttl : '30000';

    return new Promise<any>((resolve, reject) => {
        redisClient.eval(lockScript, 1, resourse, uniqueStr, ttl, (err: Error, reply: any) =>{
            if (err) {
                console.error(err);
                reject(err);                
            }

            // reply will be nil when the key exists, on the contrary it will be "OK"
            resolve(reply);
        });
    });
}

async function unlock (resourse: string, uniqueStr: string) {
    return new Promise<any>((resolve, reject) => {
        redisClient.eval(unlockScript, 1, resourse, uniqueStr, (err: Error, reply: any) =>{
            if (err) {
                console.error(err);
                reject(err);
            }

            resolve(reply);
        });
    });
}

async function get (key: string): Promise<string> {
    return new Promise<string>((resovle, reject) => {
        redisClient.get(key, (err: Error, reply: string) => {
            if (err) {
                console.error(err);
                reject(err);
            }
            resovle(reply);
        })
    });
}

async function set (key: string, value: string): Promise<string> {
    return new Promise<string>((resovle, reject) => {
        redisClient.set(key, value, (err: Error, reply: string) => {
            if (err) {
                console.error(err);
                reject(err);
            }
            resovle(reply);
        })
    });
}
0 голосов
/ 31 мая 2018

Redis предоставляет более гибкий способ «атомарного» выполнения транзакций с использованием подхода MULTI-EXEC.

Представьте, что вы работаете в банковской системе, а клиент запрашивает перевод денежных средств.Вам необходимо сначала сделать скидку на деньги со счета клиента, а затем перевести их на другой счет.Между этими операциями может произойти сбой одной из них.Итак, мы используем транзакции.

Транзакция является атомарной, что означает, что либо все операции выполняются, ни одна из них не выполняется.Базы данных имеют реализацию управления блокировками и параллелизмом, которая обеспечивает это, в то время как это не оказывает существенного влияния на производительность системы.

Пример:

var redis  = require("redis"),
    client = redis.createClient(), multi;

async function add () { 
    multi = client.multi();
    let numStr: string = await multi.get('num');
    let num: number = Number(numStr);
    num++;
    await multi.set('num', String(num));
    multi.exec(function (err, replies) {
        console.log(replies);});
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...