Вы можете заставить его работать, добавив преобразование Group By Key после оконного управления. Вы назначили ключи для записей, но на самом деле не сгруппировали их по ключам, а окно сеанса (которое работает по ключам) не знает, что эти события необходимо объединить.
Чтобы подтвердить это, я сделал воспроизводимый пример с некоторыми фиктивными данными в памяти (чтобы изолировать Pub / Sub от проблемы и иметь возможность быстрее ее протестировать). Все пять событий будут иметь одинаковый ключ или user_id
, но они будут «прибывать» последовательно на расстоянии 1, 2, 4 и 8 секунд друг от друга. Поскольку я использую session_gap
из 5 секунд, я ожидаю, что первые 4 элемента будут объединены в одном сеансе. 5-е событие займет 8 секунд после 4-го, поэтому оно должно быть перенесено на следующую сессию (разрыв более 5 с). Данные создаются так:
data = [{'user_id': 'Thanos', 'value': 'event_{}'.format(event), 'timestamp': time.time() + 2**event} for event in range(5)]
Мы используем beam.Create(data)
для инициализации конвейера и beam.window.TimestampedValue
для назначения «поддельных» временных отметок. Опять же, мы просто симулируем потоковое поведение с этим. После этого мы создаем пары ключ-значение благодаря полю user_id
, добавляем window.Sessions
и добавляем пропущенный шаг beam.GroupByKey()
. Наконец, мы регистрируем результаты в слегка измененной версии DebugPrinter
:. Теперь конвейер выглядит так:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'keyed_on_user_id' >> beam.Map(lambda x: (x['user_id'], x))
| 'user_session_window' >> beam.WindowInto(window.Sessions(session_gap),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW) \
| 'Group' >> beam.GroupByKey()
| 'debug_printer' >> beam.ParDo(DebugPrinter()))
, где DebugPrinter
:
class DebugPrinter(beam.DoFn):
"""Just prints the element with logging"""
def process(self, element, window=beam.DoFn.WindowParam):
for x in element[1]:
logging.info(">>> Received %s %s with window=%s", x['value'], x['timestamp'], window)
yield element
Если мы проверим это без группировки по ключу, мы получим такое же поведение:
INFO:root:>>> Received event_0 1554117323.0 with window=[1554117323.0, 1554117328.0)
INFO:root:>>> Received event_1 1554117324.0 with window=[1554117324.0, 1554117329.0)
INFO:root:>>> Received event_2 1554117326.0 with window=[1554117326.0, 1554117331.0)
INFO:root:>>> Received event_3 1554117330.0 with window=[1554117330.0, 1554117335.0)
INFO:root:>>> Received event_4 1554117338.0 with window=[1554117338.0, 1554117343.0)
Но после добавления окна теперь работают как положено. События с 0 по 3 объединяются в расширенное 12-секундное окно сеанса. Событие 4 принадлежит отдельному сеансу 5s.
INFO:root:>>> Received event_0 1554118377.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_1 1554118378.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_3 1554118384.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_2 1554118380.37 with window=[1554118377.37, 1554118389.37)
INFO:root:>>> Received event_4 1554118392.37 with window=[1554118392.37, 1554118397.37)
Полный код здесь
Две дополнительные вещи, которые стоит упомянуть. Во-первых, даже при локальном запуске этого на одном компьютере с DirectRunner записи могут быть неупорядоченными (в моем случае event_3 обрабатывается до event_2). Это сделано с целью симулировать распределенную обработку, как описано здесь .
Последнее, что если вы получите трассировку стека, как это:
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'Write Results/Write/WriteImpl/WriteBundles']
понижение с 2.10.0 / 2.11.0 SDK до 2.9.0. См. Например ответ .