Я пытаюсь запустить конвейер с помощью apache_beam (в конце получит DataFlow).
Конвейер должен выглядеть следующим образом:
Я форматирую данные из PubSub, записываю необработанные результаты в Firestore, запускаю модель ML, и после получения результатов из модели ML я хочу обновить Firestore с помощью идентификатора, полученного из сначала напишите в FS.
Код конвейера в целом выглядит следующим образом:
with beam.Pipeline(options=options) as p:
# read and format
formated_msgs = (
p
| "Read from PubSub" >> LoadPubSubData(known_args.topic)
)
# write the raw results to firestore
write_results = (
formated_msgs
| "Write to FS" >> beam.ParDo(WriteToFS())
| "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs))
)
# Run the ML model
ml_results = (
formated_msgs
| "ML" >> ML()
| "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row))
)
# Merge by key and update - HERE IS THE PROBLEM
(
(write_results, ml_results) # I want to have the data from both merged by the key at this point
| "group" >> beam.CoGroupByKey()
| "log" >> beam.ParDo(LogFn())
)
Я пробовал так много способов, но, похоже, не могу найти правильный способ сделать это , Есть идеи?
--- обновление 1 ---
Проблема в том, что в строке журнала я ничего не получаю. Иногда я даже получаю тайм-аут на операцию. Может быть важно отметить, что вначале я передаю данные из PubSub.