Этот вопрос больше относится к пониманию синтаксиса для подключения конвейера gcp в apache beam. вот как настроен мой текущий конвейер
options = dataflow_options(project_id=project_id, topic_name=topic_name, job_name=job_name)
p = apache_beam.Pipeline(options=options)
(p
| 'read pubusb' >> apache_beam.io.ReadFromPubSub(topic=topic_path, with_attributes=True)
| 'decode the message' >> apache_beam.ParDo(mydecoder())
| 'persist to db' >> apache_beam.io.WriteToBigQuery(
output_table,
create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND
))
p.run()
С этим я могу создать конвейер, который выглядит примерно так:
Теперь, что я действительно хочу сделать (учитывая, что мой декодер один и тот же), это подключить несколько пабов к одному и тому же декодеру, т. Е.
Как могу ли я достичь этого в apache beam
Несколько вещей, которые я забыл упомянуть
- Все темы в основном являются потоком байтов.
- Не существует общего ключа между данными при чтении из тем
- Каждая топи c будет иметь разные логики c для декодирования
Я смотрел на CoGroupby
, но это нужен общий ключ.