Очередь или другие методы для обработки тиковых данных? - PullRequest
1 голос
/ 07 января 2020

В нашей электронной системе c нам необходимо выполнить расчеты на основе тиковых данных из 100+ контрактов.

Тиковые данные контрактов не получены в одном сообщении. Одно сообщение включает в себя только данные для одного контракта. Временная метка контрактов немного отличается (иногда большая разница, но давайте проигнорируем этот случай).

eg: (first column is timestamp. Second is contract name)
below 2 data has 1ms diff
10:34:03.235,10002007,510050C2006A03500  ,0.0546
10:34:03.236,10001909,510050C2003A02750  ,0.3888

below 2 data has 3ms diff
10:34:03.594,10002154,510300C2003M03700  ,0.4985
10:34:03.597,10002118,510300C2001M03700  ,0.4514

Только данные с изменением цены будут иметь данные. Поэтому я не могу посчитать номер контракта, чтобы узнать, получил ли я все данные для этого тика.

Но с другой стороны, мы не хотим ждать, пока получим все данные для тика, потому что иногда данные могут задерживаться на долгое время, мы захотим исключить их.

Требуется низкая задержка. Поэтому я думаю, что мы определим окно - скажем, 50 мс - и начнем вычислять, основываясь на любых данных, которые мы получили за последние 50 мс.

Каков наилучший способ обработки такого варианта использования?

Первоначально я хочу использовать поток redis для поддержки небольшой очереди, чтобы при получении данных контракта я направлял sh в поток redis. Но я не мог понять, каков наилучший способ получения данных, как только определенное время (скажем, 50 мс) прошло.

Я думаю о том, возможно, мне следует использовать некоторые другие технические данные? Любые предложения приветствуются.

1 Ответ

0 голосов
/ 08 января 2020

Используйте XRANGE myStream - + COUNT 1 для получения первой записи.

Используйте XREVRANGE myStream + - COUNT 1 для получения последней записи.

XINFO STREAM myStream также приносит первую и последнюю запись, но the документы говорят, что это O(log N).

Если вы используете метку времени в качестве идентификатора или в качестве поля, то вы можете вычислить разницу во времени.

Если вы используете Redis Автоматический идентификатор потоков (XADD myStream * ...), первая часть идентификатора - отметка времени UNIX в миллисекундах.

Принимая вышеизложенное, вы можете выполнить проверку атомарно с помощью сценария Lua:

EVAL "local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1') local firstTime = {} if next(first) == nil then     return redis.error_reply('Stream is empty or key doesn`t exist') end for str in string.gmatch(first[1][1], '([^-]+)') do     table.insert(firstTime, tonumber(str)) end local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1') local lastTime = {} for str in string.gmatch(last[1][1], '([^-]+)') do     table.insert(lastTime, tonumber(str)) end local ms = lastTime[1] - firstTime[1] if ms >= tonumber(ARGV[1]) then     return redis.call('XRANGE', KEYS[1], '-', '+') else     return redis.error_reply('Only '..ms..' ms') end" 1 myStream 50

Аргументы: numKeys(1 here) streamKey timeInMs(50 here): 1 myStream 50.

Здесь дружественный взгляд на скрипт Lua:

local first = redis.call('XRANGE', KEYS[1], '-', '+', 'COUNT', '1')
local firstTime = {}
if next(first) == nil then
    return redis.error_reply('Stream is empty or key doesn`t exist')
end
for str in string.gmatch(first[1][1], '([^-]+)') do
    table.insert(firstTime, tonumber(str))
end
local last = redis.call('XREVRANGE', KEYS[1], '+', '-', 'COUNT', '1')
local lastTime = {}
for str in string.gmatch(last[1][1], '([^-]+)') do
    table.insert(lastTime, tonumber(str))
end
local ms = lastTime[1] - firstTime[1]
if ms >= tonumber(ARGV[1]) then
    return redis.call('XRANGE', KEYS[1], '-', '+')
else
    return redis.error_reply('Only '..ms..' ms')
end

Возвращает:

  • (error) Stream is empty or key doesn`t exist
  • (error) Only 34 ms, если у нас нет требуемого времени
  • Фактический список записей, если истекло требуемое время между первым и последним сообщением .

Обязательно отметьте Введение в Redis Streams , чтобы ознакомиться с Redis Streams, и команду EVAL , чтобы узнать о * 1 048 * скриптов.

...