У меня есть входящие сообщения от Kakfa topi c с payload_data
в формате:
{
'event_time': 1586192028,
'event_id': u'123',
'field1': 'f12'
},
{
'event_time': 1586192038,
'event_id': u'124',
'field1': 'f12'
}
Мой конвейер луча следующий:
(
messages
| "MapToTuple"
>> beam.Map(lambda message: (message.payload_data['event_id'], message))
| "Window"
>> beam.WindowInto(
window.FixedWindows(60), # 60 seconds event time window
trigger=Repeatedly(
AfterAny(
# Early firing every 10 seconds or 10 messages whichever come first
AfterProcessingTime(10 * 1000),
AfterCount(10),
)
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
| "group" >> beam.GroupByKey()
| "Process" >> beam.ParDo(MyCustomPardo())
)
Мое намерение состоит в том, чтобы запускать триггер каждые 10 секунд или 10 сообщений, в зависимости от того, что наступит раньше. Но я получаю 1 сообщение за раз. Что такое GroupByKey
в этом случае и как я могу получить достижение every 10 secs of 10 messages
из конвейера?