Объединить PCollection с apache_beam - PullRequest
1 голос
/ 10 марта 2020

Я пытаюсь запустить конвейер с помощью apache_beam (в конце получит DataFlow).

Конвейер должен выглядеть следующим образом:

enter image description here

Я форматирую данные из PubSub, записываю необработанные результаты в Firestore, запускаю модель ML, и после получения результатов из модели ML я хочу обновить Firestore с помощью идентификатора, полученного из сначала напишите в FS.

Код конвейера в целом выглядит следующим образом:

with beam.Pipeline(options=options) as p:
    # read and format
    formated_msgs = (
        p
        | "Read from PubSub" >> LoadPubSubData(known_args.topic)
    )

    # write the raw results to firestore
    write_results = (
        formated_msgs
        | "Write to FS" >> beam.ParDo(WriteToFS())
        | "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs))
    )

    # Run the ML model
    ml_results = (
        formated_msgs
        | "ML" >> ML()
        | "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row))
    )

    # Merge by key and update - HERE IS THE PROBLEM
    (
        (write_results, ml_results) # I want to have the data from both merged by the key at this point
        | "group" >> beam.CoGroupByKey()
        | "log" >> beam.ParDo(LogFn())
    )

Я пробовал так много способов, но, похоже, не могу найти правильный способ сделать это , Есть идеи?

--- обновление 1 ---

Проблема в том, что в строке журнала я ничего не получаю. Иногда я даже получаю тайм-аут на операцию. Может быть важно отметить, что вначале я передаю данные из PubSub.

1 Ответ

1 голос
/ 11 марта 2020

ОК, так что я наконец понял это. Единственное, чего мне не хватало, так это оконного управления, я полагаю, поскольку я передаю данные в потоковом режиме.

Итак, я добавил следующее:

with beam.Pipeline(options=options) as p:
    # read and format
    formated_msgs = (
        p
        | "Read from PubSub" >> LoadPubSubData(known_args.topic)
        | "Windowing" >> beam.WindowInto(window.FixedWindows(30))
    )
...