Я видел несколько похожих вопросов по этой проблеме, которые предполагают, что низкая пропускная способность PubSub может вызвать проблемы; тем не менее, у меня более чем достаточно данных, поступающих к вещам pu sh ...
Это потоковый конвейер Python, считывающий данные из PubSub с конечной целью записи записей в Redis (Memorystore) для использования в качестве кэша.
with beam.Pipeline(options=pipeline_options) as p:
windowed_history_events = (p
| "Read input from PubSub" >> beam.io.ReadFromPubSub(subscription=known_args.subscription)
| "Parse string message to dict" >> beam.ParDo(ParseMessageToDict())
| "Filter non-page views" >> beam.Filter(is_page_view)
| "Create timestamp KV" >> beam.ParDo(CreateTimestampKV())
| "Window for writes" >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(10)),
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| "Get user and content ID" >> beam.ParDo(ParseMessageToKV())
| "Group by user ID" >> beam.GroupByKey()
| "Create timestamp KV2" >> beam.ParDo(TmpDOFN())
| "Push content history to Memorystore" >> beam.ParDo(
ConnectToRedis(known_args.host, known_args.port))
)
Функция TmpDoFN () после шага GroupByKey сейчас просто в качестве шага отладки - она просто печатает сообщения, чтобы убедиться, что что-то проходит через них:
class TmpDOFN(beam.DoFn):
def process(self, message):
print(message)
yield message
Однако это никогда не вызывается и ничего не печатается (и точка отладки PyCharm никогда не срабатывает). Насколько я понимаю, оконная функция / триггер, который я настроил в данный момент, должна просто выводить каждые 10 сообщений, которые затем группируются и передаются на следующий шаг.
Если я удаляю шаг GroupByKey, сообщения печатаются как и ожидалось, конвейер продолжается ..
Я пробовал это с помощью FixedWindow ранее и столкнулся с той же проблемой.
Есть идеи?
Спасибо