Я обрабатываю аналитические обращения в конвейере Apache Beam, написанном на python.Я использую FixedWindows из 10 минут, и я хотел бы вызвать предупреждение (например, с Cloud Pub / Sub), когда окно пусто.Итак, вот что я сделал:
ten_min_windows = day_hits | '10MinutesWindows' >> beam.WindowInto(
beam.window.FixedWindows(10 * 60))
ten_min_alerts = (ten_min_windows
| 'CountTransactions10Min' >> beam.CombineGlobally(count_transactions).without_defaults()
| 'KeepZeros10Min' >> beam.Filter(keep_zeros)
| 'ConvertToAlerts10Min' >> beam.ParDo(ToAlert()))
count_transactions фильтрует, чтобы сохранить только попадания транзакции, а затем возвращает длину полученного списка.keep_zeros возвращает true, если итоговая длина равна 0. Проблема в том, что если PCollection не содержала хитов транзакций, длина вообще не возвращается, и я получаю пустую PCollection из-за отсутствия значений по умолчанию.Кажется, я не могу вынуть без значений по умолчанию, поскольку это не разрешено при использовании неглобальных окон.
Я видел этот поток , советующий добавить фиктивный элемент в каждое окно, а затем проверьте, чтоколичество больше одного.
Это лучшее решение или есть лучший способ?
Как я могу это сделать, так как мне нужно будет иметь только один элемент на окно?Могу ли я закодировать это в конвейере напрямую или мне нужно запланировать ложное попадание, которое будет отправляться (например, через Cloud Pub / Sub) каждые 10 минут?