Хранение данных за последние пару минут в списке повторного - PullRequest
1 голос
/ 25 января 2020

Я работаю над кэшированием базы данных системы и столкнулся с этой проблемой:

У нас есть несколько удаленных устройств, которые каждую секунду отправляют журнал на сервер! Поскольку у нас много таких устройств, мы не можем просто хранить все данные в обычной таблице базы данных. На самом деле нам нужен быстрый доступ к последним 5 минутам журналов, отправленных каждым устройством. Журналы находятся в json, но мы сопоставляем их с Go структурами. Каждый журнал имеет некоторые данные, такие как:

    type Log struct {
    DeviceID        string
    CompanyID       uint
    DeviceTime      time.Time
    Latitude        float64
    Longitude       float64
    Altitude        float64
    SpeedOTG        float32
}

Мы используем redis для хранения данных на оперативной памяти.

Моя проблема :

Задача - сохранить все журналы последних 5 минут каждого устройства в списке. Также я должен получить эти данные, когда предоставлен ключ cach . Я должен сделать что-то вроде очереди. Когда он заполнен (он содержит журнал последних 5 минут), каждая новая запись входит и старые записи должны go выходить. Как бы то ни было, если список все еще не содержит последние 5 минут, то ничего не выходит! Все расчеты выполняются на основе DeviceTime , указанного выше.

Сначала я подумал о списке журналов определенного размера (5 * 60) для хранения, но нет никакой гарантии, что каждую секунду появляется новый журнал!

1 Ответ

1 голос
/ 26 января 2020

Храните свои журналы в Redis Streams . Для вставки вы используете команду XADD . [Go Пример Redis ].

Вы можете хранить все устройства в одном потоке или использовать один поток на устройство.

Здесь, предполагая один поток на устройство и сохраняя набор идентификаторов устройств, вы должны вставить:

SADD devices myDeviceID
XADD logs:myDeviceID * DeviceID myDeviceID CompanyID myCompanyID DeviceTime "myDeviceTime"
     ... SpeedOTG "mySpeedOTG"

Набор devices предназначен для хранения списка идентификаторов устройств. в случае, если вы хотите получить данные для всех устройств, и вы не хотите полагаться на то, чтобы хранить их в go (на стороне клиента) или использовать SCAN. Чтобы извлечь журналы всех устройств, вы должны использовать шаблон, похожий на тот, который я описал в Какой тип данных использовать для этой реализации RedisCache? * Пример сценария , Lua в конце.

Одно из преимуществ потоков при сжатии полей: поскольку вы сохраняете свои поля согласованными (одни и те же поля в одинаковом порядке), Redis оптимизирует, не повторяя имена полей для каждой записи.

Используя *, вы позволяете Redis установить временную метку в качестве идентификатора записи. Вы можете использовать свою собственную метку времени в качестве идентификатора, если вы уверены, что она будет строго увеличиваться. Вы не можете добавить в поток с идентификатором ниже, чем последний добавленный идентификатор.

Вы можете использовать XTRIM, чтобы сохранить ваш список до 300 записей, но, как вы сказали, вы не можете полагаться на последовательные XADD в секунду.

Чтобы сохранить точное значение до последних 5 минут, вы можете использовать:

XRANGE logs:myDeviceID - + COUNT 1

Это возвращает первую (самую старую) запись. Вы сравниваете его с отметкой времени недавно добавленной последней записи, и, если она старше 5 минут, вы удаляете ее с помощью XDEL.

Вы повторяете, пока самая старая запись не станет старше чем за 5 минут.

Конечно, выполнение этой логики c на стороне клиента подразумевает множество циклов, влияющих на вашу производительность. Поэтому я предлагаю вам использовать Lua Script для этого на стороне сервера Redis. Вы можете использовать EVAL для прохождения скрипта каждый раз. Но лучше, если вы загрузите скрипт один раз, а затем используете EVALSHA .

Скрипт:

redis.call('SADD', KEYS[1], ARGV[2])

local latestID = redis.call('XADD', 'logs:'..ARGV[2], '*', 'DeviceID', ARGV[2],
                            'OtherFields', ARGV[3])
local latestTime = tonumber(string.sub(latestID, 1, string.find(latestID, '-') - 1))

local oldestID = redis.call('XRANGE', 'logs:'..ARGV[2], '-', '+', 'COUNT', '1')
oldestID = oldestID[1][1]
local oldestTime = tonumber(string.sub(oldestID, 1, string.find(oldestID, '-') - 1))

local maxTime = tonumber(ARGV[1])

while (latestTime - oldestTime) > maxTime do
    redis.call('XDEL', 'logs:'..ARGV[2], oldestID)
    oldestID = redis.call('XRANGE', 'logs:'..ARGV[2], '-', '+', 'COUNT', '1')
    oldestID = oldestID[1][1]
    oldestTime = tonumber(string.sub(oldestID, 1, string.find(oldestID, '-') - 1))
end
return { 'Added: '..latestID, 'Device: '..ARGV[2], 'Stream Key: logs:'..ARGV[2],
         'Length: '..redis.call('XLEN', 'logs:'..ARGV[2]) }

Параметры:

EVALSHA <shaId> 1 devices 300000 myDeviceID ...
                                            ^ARGV[3...]: the rest of fields
                                 ^ARGV[2]: DeviceID
                          ^ARGV[1]: logging time desired (ms), 5 minutes = 300000
                  ^KEYS[1]: the set key and prefix to device log keys
                ^numkeys

Используя потоки, вы получаете множество функций, таких как диапазон по метке времени или группы потребителей.

Но суть в том, что для сохранения журналов ограниченным временем используйте Lua скрипт. Вы можете сделать это и со списками, сохраняя записи журнала, закодированные в одну строку: timestamp:valuesSerialized и использовать аналогичный подход с Lua для добавления-проверки-трима в одной операции atomi c.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...