Окна сессий в Apache Beam с питоном - PullRequest
0 голосов
/ 20 марта 2019

У меня есть поток пользовательских событий.Я сопоставил их с KV {userId, event} и назначенными временными метками.

Это для работы в потоковом режиме.Я хотел бы иметь возможность создать следующий результат ввода-вывода:

разрыв окна сеанса = 1

  • input: user=1, timestamp=1, event=a
  • input: user=2, timestamp=2, event=a
  • вход: user=2, timestamp=3, event=a
  • вход: user=1, timestamp=2, event=b
  • время: lwm=3
  • выход: user=1, [ { event=a, timestamp=1 }, { event=b, timestamp=2 } ]
  • время: lwm=4
  • вывод: user=2, [ { event=a, timestamp=2 }, { event=a, timestamp=3 } ]

Так что я могу написать свою функцию, чтобы уменьшить список событий в окне сеанса для пользователя, а такжевремя начала и окончания сеанса.

Как мне написать это?(Если вы отвечаете: «посмотрите на примеры», это неверный ответ, потому что они никогда не передают список событий в редуктор с окном в качестве параметра)

1 Ответ

1 голос
/ 02 апреля 2019

Если я правильно понимаю, это будет продолжение этого вопроса и, естественно, будет достигнуто путем добавления шага Группировка по ключу, как я предлагаю в своем решении.

Итакссылаясь на мое предыдущее объяснение и фокусируясь здесь только на изменениях, если у нас есть конвейер, подобный этому:

events = (p
  | 'Create Events' >> beam.Create(user1_data + user2_data) \
  | 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
  | 'keyed_on_user_id'      >> beam.Map(lambda x: (x['user_id'], x))
  | 'user_session_window'   >> beam.WindowInto(window.Sessions(session_gap),
                                             timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
  | 'Group' >> beam.GroupByKey() \
  | 'analyze_session'         >> beam.ParDo(AnalyzeSession()))

Теперь элементы расположены так, как вы описали в описании вопроса, поэтому мы можем просто войти в нихAnalyzeSession:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    logging.info(element)
    yield element

для получения желаемых результатов:

INFO:root:('Groot', [{'timestamp': 1554203778.904401, 'user_id': 'Groot', 'value': 'event_0'}, {'timestamp': 1554203780.904401, 'user_id': 'Groot', 'value': 'event_1'}])
INFO:root:('Groot', [{'timestamp': 1554203786.904402, 'user_id': 'Groot', 'value': 'event_2'}])
INFO:root:('Thanos', [{'timestamp': 1554203792.904399, 'user_id': 'Thanos', 'value': 'event_4'}])
INFO:root:('Thanos', [{'timestamp': 1554203784.904398, 'user_id': 'Thanos', 'value': 'event_3'}, {'timestamp': 1554203777.904395, 'user_id': 'Thanos', 'value': 'event_0'}, {'timestamp': 1554203778.904397, 'user_id': 'Thanos', 'value': 'event_1'}, {'timestamp': 1554203780.904398, 'user_id': 'Thanos', 'value': 'event_2'}])

Если вы хотите избежать избыточной информации, такой как user_id и timestamp как частьзначения они могут быть удалены в шаге Map.В соответствии с полным вариантом использования (то есть уменьшением агрегированных событий на уровне сеанса) мы можем делать такие вещи, как подсчет количества событий или продолжительности сеанса, примерно так:

class AnalyzeSession(beam.DoFn):
  """Prints per session information"""
  def process(self, element, window=beam.DoFn.WindowParam):
    user = element[0]
    num_events = str(len(element[1]))
    window_end = window.end.to_utc_datetime()
    window_start = window.start.to_utc_datetime()
    session_duration = window_end - window_start

    logging.info(">>> User %s had %s event(s) in %s session", user, num_events, session_duration)

    yield element

, что для моегоНапример, выведет следующее:

INFO:root:>>> User Groot had 2 event(s) in 0:00:07 session
INFO:root:>>> User Groot had 1 event(s) in 0:00:05 session
INFO:root:>>> User Thanos had 4 event(s) in 0:00:12 session
INFO:root:>>> User Thanos had 1 event(s) in 0:00:05 session

Полный код здесь

...