Срабатывает триггер для ряда сообщений или через определенное время в apache луче - PullRequest
0 голосов
/ 06 апреля 2020

У меня есть входящие сообщения от Kakfa topi c с payload_data в формате:

{
    'event_time': 1586192028, 
    'event_id': u'123', 
    'field1': 'f12'
},
{
    'event_time': 1586192038, 
    'event_id': u'124', 
    'field1': 'f12'
}

Мой конвейер луча следующий:

(
    messages
    | "MapToTuple"
    >> beam.Map(lambda message: (message.payload_data['event_id'], message))
    | "Window"
    >> beam.WindowInto(
        window.FixedWindows(60),  # 60 seconds event time window
        trigger=Repeatedly(
            AfterAny(
                # Early firing every 10 seconds or 10 messages whichever come first
                AfterProcessingTime(10 * 1000),
                AfterCount(10),
            )
        ),
        accumulation_mode=AccumulationMode.ACCUMULATING,
    )
    | "group" >> beam.GroupByKey()
    | "Process" >> beam.ParDo(MyCustomPardo())
)

Мое намерение состоит в том, чтобы запускать триггер каждые 10 секунд или 10 сообщений, в зависимости от того, что наступит раньше. Но я получаю 1 сообщение за раз. Что такое GroupByKey в этом случае и как я могу получить достижение every 10 secs of 10 messages из конвейера?

...