Решение, с которым я решил жить, фактически заключается в том, чтобы «не делать этого в SQL», отложив фактическую сессионизацию до скалярной функции, написанной на python.
--
-- The input parameter should be a comma delimited list of identifiers
-- Each identified should be a "power of 2" value, no lower than 1
-- (1, 2, 4, 8, 16, 32, 64, 128, etc, etc)
--
-- The input '1,2,4,2,1,1,4' will give the output '0001010'
--
CREATE OR REPLACE FUNCTION public.f_indentify_collision_indexes(arr varchar(max))
RETURNS VARCHAR(MAX)
STABLE AS
$$
stream = map(int, arr.split(','))
state = 0
collisions = []
item_id = 1
for item in stream:
if (state & item) == (item):
collisions.append('1')
state = item
else:
state |= item
collisions.append('0')
item_id += 1
return ''.join(collisions)
$$
LANGUAGE plpythonu;
ПРИМЕЧАНИЕ: я бы не использовал это, если бы были сотни типов событий;)
По сути, я передаю структуру данных событий в последовательности, а возвращаемая структура данных - это то, где начинаются новые сеансы.
Я выбрал фактические структуры данных, чтобы максимально упростить работу с SQL. (Не может быть лучшим, очень открытым для других идей.)
INSERT INTO
sessionised_event_stream
SELECT
device_id,
REGEXP_COUNT(
LEFT(
public.f_indentify_collision_indexes(
LISTAGG(event_type_id, ',')
WITHIN GROUP (ORDER BY session_event_sequence_id)
OVER (PARTITION BY device_id)
),
session_event_sequence_id::INT
),
'1',
1
) + 1
AS session_login_attempt_id,
session_event_sequence_id,
event_timestamp,
event_type_id,
event_data
FROM
(
SELECT
*,
ROW_NUMBER()
OVER (PARTITION BY device_id
ORDER BY event_timestamp, event_type_id, event_data)
AS session_event_sequence_id
FROM
event_stream
)
Утверждение детерминированного порядка событий (в случае событий, происходящих в одно и то же время и т. Д.)
ROW_NUMBER() OVER (stuff) AS session_event_sequence_id
Создать разделенный запятыми список идентификаторов event_type_id
LISTAGG(event_type_id, ',')
=> '1,2,4,8,2,1,4,1,4,4,1,1'
Использование python для определения границ
public.f_magic('1,2,4,8,2,1,4,1,4,4,1,1')
=> '000010010101'
Для первого события в последовательности подсчитайте число от 1 до первого знака в «границах» и включите его. Для второго события в последовательности подсчитайте число от 1 до второго знака в границах, включая и т. Д. И т. Д. И т. Д.
event 01 = 1
=> boundaries = '0'
=> session_id = 0
event 02 = 2
=> boundaries = '00'
=> session_id = 0
event 03 = 4
=> boundaries = '000'
=> session_id = 0
event 04 = 8
=> boundaries = '0000'
=> session_id = 0
event 05 = 2
=> boundaries = '00001'
=> session_id = 1
event 06 = 1
=> boundaries = '000010'
=> session_id = 1
event 07 = 4
=> boundaries = '0000100'
=> session_id = 1
event 08 = 1
=> boundaries = '00001001'
=> session_id = 2
event 09 = 4
=> boundaries = '000010010'
=> session_id = 2
event 10 = 4
=> boundaries = '0000100101'
=> session_id = 3
event 11 = 1
=> boundaries = '00001001010'
=> session_id = 3
event 12 = 1
=> boundaries = '000010010101'
=> session_id = 4
REGEXP_COUNT( LEFT('000010010101', session_event_sequence_id), '1', 1 )
В результате получается что-то не очень быстрое, но надежное и все же лучше, чем другие варианты, которые я пробовал. То, на что это похоже, это то, что (возможно, может быть, я не уверен, предостережение, предостережение) если в потоке 100 элементов, тогда LIST_AGG()
вызывается один раз, и вызывается UDF для Python 100 раз. Я могу быть не прав. Я видел, как Redshift делал худшие вещи;)
Псевдокод для того, что оказывается худшим вариантом.
Write some SQL that can find "the next session" from any given stream.
Run that SQL once storing the results in a temp table.
=> Now have the first session from every stream
Run it again using the temp table as an input
=> We now also have the second session from every stream
Keep repeating this until the SQL inserts 0 rows in to the temp table
=> We now have all the sessions from every stream
Время, затрачиваемое на вычисление каждого сеанса, было относительно небольшим, и на самом деле доминировали накладные расходы на повторные запросы к RedShift. Это также означало, что доминирующим фактором было «сколько сеансов находится в самом длинном потоке» (в моем случае 0,0000001% потоков было в 1000 раз длиннее среднего).
Версия Python на самом деле медленнее в большинстве отдельных случаев, но в ней не преобладают раздражающие выбросы. Это означало, что в целом версия Python завершилась примерно в 10 раз быстрее, чем версия «внешнего цикла», описанная здесь. Он также использовал загрузку большего количества ресурсов ЦП, но затраченное время является более важным фактором сейчас:)