«Sessioning» поток событий - PullRequest
       23

«Sessioning» поток событий

0 голосов
/ 30 октября 2018

У меня есть проблема, которая должна быть решена за пределами SQL, но из-за бизнес-ограничений необходимо решать в SQL.

  • Так что, пожалуйста, не говорите мне делать это при приеме данных, вне SQL, я хочу, но это не вариант ...


У меня есть поток событий с 4 основными свойствами ....

  • Исходное устройство
  • метка времени события
  • Тип события
  • «Полезная нагрузка» события (страшный VARCHAR, представляющий различные типы данных)


Что мне нужно сделать, это разбить поток на части (что я буду называть «сессиями») .

  • Каждый сеанс зависит от устройства (фактически PARTITION BY device_id)
  • Ни один сеанс не может содержать более одного события одного типа


Чтобы сократить примеры, я ограничу их включением только отметки времени и типа события ...

 timestamp | event_type          desired_session_id
-----------+------------        --------------------
     0     |     1                      0
     1     |     4                      0
     2     |     2                      0
     3     |     3                      0

     4     |     2                      1
     5     |     1                      1
     6     |     3                      1
     7     |     4                      1

     8     |     4                      2

     9     |     4                      3
    10     |     1                      3

    11     |     1                      4
    12     |     2                      4

Идеализированным конечным результатом может быть поворот окончательных результатов ...

device_id | session_id | event_type_1_timestamp | event_type_1_payload |  event_type_2_timestamp | event_type_2_payload ...

(Но это еще не сделано в камне, но мне нужно будет «знать», какие события составляют сессию, их временные метки и каковы их полезные нагрузки. Возможно, что просто добавив session_id достаточно столбца для ввода, пока я не «потеряю» другие свойства.)

* *
тысяча сорок-девять

Есть:

  • 12 дискретных типов событий
  • сотни тысяч устройств
  • сотни тысяч событий на устройство
  • «норма» около 6-8 событий на «сессию»
  • но иногда сеанс может иметь только 1 или все 12

Эти факторы означают, что полукартовые произведения и тому подобное, ммм, менее желательны, но, возможно, могут быть «единственным способом».


Я играл (в моей голове) с аналитическими функциями и процессами типа пробелов и островков, но так и не смог добраться до них. Я всегда возвращаюсь к месту, где мне «нужны» некоторые флаги, которые я могу переносить из строки в строку и сбрасывать их по мере необходимости ...

Псевдо-код, который не работает в SQL ...

flags = [0,0,0,0,0,0,0,0,0]
session_id = 0
for each row in stream
   if flags[row.event_id] == 0 then
       flags[row.event_id] = 1
   else
       session_id++
       flags = [0,0,0,0,0,0,0,0,0]
   row.session_id = session_id

Любое решение SQL для этого приветствуется, но вы получаете «бонусные баллы», если вы можете также учитывать события, «происходящие одновременно» ...

If multiple events happen at the same timestamp
  If ANY of those events are in the "current" session
    ALL of those events go in to a new session
  Else
    ALL of those events go in to the "current" session

If such a group of event include the same event type multiple times
  Do whatever you like
  I'll have had enough by that point...
  But set the session as "ambiguous" or "corrupt" with some kind of flag?

Ответы [ 3 ]

0 голосов
/ 30 октября 2018

UPD на основе обсуждения (не проверено / не проверено, грубая идея):

WITH
trailing_events as (
    select *, listagg(event_type::varchar,',') over (partition by device_id order by ts rows between previous 12 rows and current row) as events
    from tbl
)
,session_flags as (
    select *, f_get_session_flag(events) as session_flag
    from trailing_events
)
SELECT
 *
,sum(session_flag::int) over (partition by device_id order by ts) as session_id
FROM session_flags

, где f_get_session_flag равно

create or replace function f_get_session_flag(arr varchar(max))
returns boolean
stable as $$
stream = arr.split(',')
flags = [0,0,0,0,0,0,0,0,0,0,0,0]
is_new_session = False
for row in stream:
   if flags[row.event_id] == 0:
       flags[row.event_id] = 1
       is_new_session = False
   else:
       session_id+=1
       flags = [0,0,0,0,0,0,0,0,0,0,0,0]
       is_new_session = True
return is_new_session
$$ language plpythonu;

предыдущий ответ:

Флаги могут быть реплицированы как остаток от деления счетчика событий и 2:

1 -> 1%2 = 1
2 -> 2%2 = 0
3 -> 3%2 = 1
4 -> 4%2 = 0
5 -> 5%2 = 1
6 -> 6%2 = 0

и объединены в битовую маску (аналогично массиву flags в псевдокоде). Единственный сложный момент - когда нужно точно сбросить все флаги до нуля и инициировать новый идентификатор сеанса, но я могу подойти довольно близко. Если ваша примерная таблица называется t и в ней есть столбцы ts и type, скрипт может выглядеть следующим образом:

with
-- running count of the events
t1 as (
    select
     *
    ,sum(case when type=1 then 1 else 0 end) over (order by ts) as type_1_cnt
    ,sum(case when type=2 then 1 else 0 end) over (order by ts) as type_2_cnt
    ,sum(case when type=3 then 1 else 0 end) over (order by ts) as type_3_cnt
    ,sum(case when type=4 then 1 else 0 end) over (order by ts) as type_4_cnt
    from t
)
-- mask
,t2 as (
    select
     *
    ,case when type_1_cnt%2=0 then '0' else '1' end ||
     case when type_2_cnt%2=0 then '0' else '1' end ||
     case when type_3_cnt%2=0 then '0' else '1' end ||
     case when type_4_cnt%2=0 then '0' else '1' end as flags
    from t1
)
-- previous row's mask
,t3 as (
    select
     *
    ,lag(flags) over (order by ts) as flags_prev
    from t2
)
-- reset the mask if there is a switch from 1 to 0 at any position
,t4 as (
    select *
    ,case
        when (substring(flags from 1 for 1)='0' and substring(flags_prev from 1 for 1)='1')
        or (substring(flags from 2 for 1)='0' and substring(flags_prev from 2 for 1)='1')
        or (substring(flags from 3 for 1)='0' and substring(flags_prev from 3 for 1)='1')
        or (substring(flags from 4 for 1)='0' and substring(flags_prev from 4 for 1)='1')
        then '0000'
        else flags
     end as flags_override
    from t3
)
-- get the previous value of the reset mask and same event type flag for corner case 
,t5 as (
    select *
    ,lag(flags_override) over (order by ts) as flags_override_prev
    ,type=lag(type) over (order by ts) as same_event_type
    from t4
)
-- again, session ID is a switch from 1 to 0 OR same event type (that can be a switch from 0 to 1)
select
 ts
,type
,sum(case
 when (substring(flags_override from 1 for 1)='0' and substring(flags_override_prev from 1 for 1)='1')
        or (substring(flags_override from 2 for 1)='0' and substring(flags_override_prev from 2 for 1)='1')
        or (substring(flags_override from 3 for 1)='0' and substring(flags_override_prev from 3 for 1)='1')
        or (substring(flags_override from 4 for 1)='0' and substring(flags_override_prev from 4 for 1)='1')
        or same_event_type
        then 1
        else 0 end
 ) over (order by ts) as session_id
from t5
order by ts
;

Вы можете добавлять необходимые разделы и расширять до 12 типов событий, этот код предназначен для работы с предоставленной вами образцовой таблицей ... это не идеально, если вы запустите подзапросы, вы увидите, что флаги сбрасываются чаще чем нужно, но в целом это работает, за исключением углового случая для идентификатора сеанса 2 с одним типом события = 4 после окончания другого сеанса с тем же типом события = 4, поэтому я добавил простой поиск в same_event_type и использовал его как еще одно условие для нового идентификатора сеанса, надеюсь, это сработает с большим набором данных.

0 голосов
/ 05 ноября 2018

Решение, с которым я решил жить, фактически заключается в том, чтобы «не делать этого в SQL», отложив фактическую сессионизацию до скалярной функции, написанной на python.

--
-- The input parameter should be a comma delimited list of identifiers
-- Each identified should be a "power of 2" value, no lower than 1
-- (1, 2, 4, 8, 16, 32, 64, 128, etc, etc)
--
-- The input '1,2,4,2,1,1,4' will give the output '0001010'
--
CREATE OR REPLACE FUNCTION public.f_indentify_collision_indexes(arr varchar(max))
RETURNS VARCHAR(MAX)
STABLE AS
$$
    stream = map(int, arr.split(','))
    state = 0
    collisions = []
    item_id = 1
    for item in stream:
        if (state & item) == (item):
            collisions.append('1')
            state = item
        else:
            state |= item
            collisions.append('0')
        item_id += 1

    return ''.join(collisions)
$$
LANGUAGE plpythonu;

ПРИМЕЧАНИЕ: я бы не использовал это, если бы были сотни типов событий;)


По сути, я передаю структуру данных событий в последовательности, а возвращаемая структура данных - это то, где начинаются новые сеансы.

Я выбрал фактические структуры данных, чтобы максимально упростить работу с SQL. (Не может быть лучшим, очень открытым для других идей.)

INSERT INTO
    sessionised_event_stream
SELECT
    device_id,
    REGEXP_COUNT(
        LEFT(
            public.f_indentify_collision_indexes(
                LISTAGG(event_type_id, ',')
                    WITHIN GROUP (ORDER BY session_event_sequence_id)
                    OVER (PARTITION BY device_id)
            ),
            session_event_sequence_id::INT
        ),
        '1',
        1
    ) + 1
        AS session_login_attempt_id,
    session_event_sequence_id,
    event_timestamp,
    event_type_id,
    event_data
FROM
(
    SELECT
        *,
        ROW_NUMBER()
            OVER (PARTITION BY device_id
                      ORDER BY event_timestamp, event_type_id, event_data)
                AS session_event_sequence_id
    FROM
        event_stream
)
  1. Утверждение детерминированного порядка событий (в случае событий, происходящих в одно и то же время и т. Д.)
    ROW_NUMBER() OVER (stuff) AS session_event_sequence_id

  2. Создать разделенный запятыми список идентификаторов event_type_id
    LISTAGG(event_type_id, ',') => '1,2,4,8,2,1,4,1,4,4,1,1'

  3. Использование python для определения границ
    public.f_magic('1,2,4,8,2,1,4,1,4,4,1,1') => '000010010101'

  4. Для первого события в последовательности подсчитайте число от 1 до первого знака в «границах» и включите его. Для второго события в последовательности подсчитайте число от 1 до второго знака в границах, включая и т. Д. И т. Д. И т. Д.
    event 01 = 1 => boundaries = '0' => session_id = 0
    event 02 = 2 => boundaries = '00' => session_id = 0
    event 03 = 4 => boundaries = '000' => session_id = 0
    event 04 = 8 => boundaries = '0000' => session_id = 0
    event 05 = 2 => boundaries = '00001' => session_id = 1
    event 06 = 1 => boundaries = '000010' => session_id = 1
    event 07 = 4 => boundaries = '0000100' => session_id = 1
    event 08 = 1 => boundaries = '00001001' => session_id = 2
    event 09 = 4 => boundaries = '000010010' => session_id = 2
    event 10 = 4 => boundaries = '0000100101' => session_id = 3
    event 11 = 1 => boundaries = '00001001010' => session_id = 3
    event 12 = 1 => boundaries = '000010010101' => session_id = 4

    REGEXP_COUNT( LEFT('000010010101', session_event_sequence_id), '1', 1 )

В результате получается что-то не очень быстрое, но надежное и все же лучше, чем другие варианты, которые я пробовал. То, на что это похоже, это то, что (возможно, может быть, я не уверен, предостережение, предостережение) если в потоке 100 элементов, тогда LIST_AGG() вызывается один раз, и вызывается UDF для Python 100 раз. Я могу быть не прав. Я видел, как Redshift делал худшие вещи;)


Псевдокод для того, что оказывается худшим вариантом.

Write some SQL that can find "the next session" from any given stream.

Run that SQL once storing the results in a temp table.
=> Now have the first session from every stream

Run it again using the temp table as an input
=> We now also have the second session from every stream

Keep repeating this until the SQL inserts 0 rows in to the temp table
=> We now have all the sessions from every stream

Время, затрачиваемое на вычисление каждого сеанса, было относительно небольшим, и на самом деле доминировали накладные расходы на повторные запросы к RedShift. Это также означало, что доминирующим фактором было «сколько сеансов находится в самом длинном потоке» (в моем случае 0,0000001% потоков было в 1000 раз длиннее среднего).

Версия Python на самом деле медленнее в большинстве отдельных случаев, но в ней не преобладают раздражающие выбросы. Это означало, что в целом версия Python завершилась примерно в 10 раз быстрее, чем версия «внешнего цикла», описанная здесь. Он также использовал загрузку большего количества ресурсов ЦП, но затраченное время является более важным фактором сейчас:)

0 голосов
/ 30 октября 2018

Я не уверен на 100%, что это можно сделать в SQL. Но у меня есть идея для алгоритма, который может работать:

  • перечислить количество для каждого события
  • принять максимальное количество до каждой точки в качестве «группировки» для событий (это сеанс)

Итак:

select t.*,
       (max(seqnum) over (partition by device order by timestamp) - 1) as desired_session_id
from (select t.*,
             row_number() over (partition by device, event_type order by timestamp) as seqnum
      from t
     ) t;

EDIT:

Это слишком долго для комментария. У меня есть ощущение, что для этого требуется рекурсивный CTE (RBAR). Это потому, что вы не можете приземлиться в одной строке и посмотреть совокупную информацию или соседнюю информацию, чтобы определить, должна ли строка начать новый сеанс.

Конечно, в некоторых ситуациях это очевидно (скажем, предыдущая строка имеет такое же событие). И также возможно, что есть какой-то умный способ агрегирования предыдущих данных, который делает это возможным.

РЕДАКТИРОВАТЬ II:

Я не думаю, что это возможно без рекурсивных CTE (RBAR). Это не совсем математическое доказательство, но отсюда моя интуиция.

Представьте, что вы смотрите назад на 4 строки от текущего, и у вас есть:

1
2
1
2
1  <-- current row

Какой сеанс для этого? Это не определенно. Рассмотрим:

e     s           vs        e     s          
1     1                     2     1    <-- row not in look back
1     2                     1     1
2     2                     2     2
1     3                     1     2
2     3                     2     3
1     4                     1     3

Значение зависит от перехода назад. Очевидно, этот пример может быть расширен до первого события. Я не думаю, что есть способ «агрегировать» более ранние значения, чтобы различать эти два случая.

Проблема разрешима, если вы можете с уверенностью сказать, что данное событие является началом нового сеанса. Это, кажется, требует полного предварительного знания, по крайней мере, в некоторых случаях. Очевидно, есть случаи, когда это легко - например, два события подряд. Я подозреваю, однако, что это «меньшинство» таких последовательностей.

Тем не менее, вы не совсем застряли с RBAR по всей таблице, потому что у вас есть device_id для распараллеливания. Я не уверен, что ваша среда может это сделать, но в BQ или Postgres я бы:

  • Агрегируйте вдоль каждого устройства, чтобы создать массив структур с информацией о времени и событиях.
  • Выполните цикл по массивам один раз, возможно, используя пользовательский код.
  • Переназначить сеансы, присоединившись к исходной таблице или отключив логику.
...