В моем проекте я хочу использовать потоковый конвейер в потоке данных Google для обработки сообщений Pub / Sub. При очистке входных данных я также хотел бы получить побочный ввод от BigQuery. Это создало проблему, из-за которой один из двух входов не работает.
Я установил в своем конвейере параметры потоковой передачи = True, что позволяет правильно обрабатывать входы Pub / Sub. Но BigQuery не совместим с потоковыми конвейерами (см. Ссылку ниже):
https://cloud.google.com/dataflow/docs/resources/faq#what_are_the_current_limitations_of_streaming_mode
Я получил эту ошибку: «ValueError: Cloud Pub / Sub в настоящее время доступен для использования только в потоковых конвейерах». Это понятно исходя из ограничений.
Но я только хочу использовать BigQuery в качестве побочного ввода для отображения данных на входящий поток данных Pub / Sub. Локально работает, но когда я пытаюсь запустить его в потоке данных, он возвращает ошибку.
Кто-нибудь нашел хороший обходной путь для этого?
РЕДАКТИРОВАТЬ: добавив рамки моего конвейера ниже для справки:
# Set all options needed to properly run the pipeline
options = PipelineOptions(streaming=True,
runner='DataflowRunner',
project=project_id)
p = beam.Pipeline(options = options)
n_tbl_src = (p
| 'Nickname Table Read' >> beam.io.Read(beam.io.BigQuerySource(
table = nickname_spec
)))
# This is the main Dataflow pipeline. This will clean the incoming dataset for importing into BQ.
clean_vote = (p
| beam.io.gcp.pubsub.ReadFromPubSub(topic = None,
subscription = 'projects/{0}/subscriptions/{1}'
.format(project_id, subscription_name),
with_attributes = True)
| 'Isolate Attributes' >> beam.ParDo(IsolateAttrFn())
| 'Fix Value Types' >> beam.ParDo(FixTypesFn())
| 'Scrub First Name' >> beam.ParDo(ScrubFnameFn())
| 'Fix Nicknames' >> beam.ParDo(FixNicknameFn(), n_tbl=AsList(n_tbl_src))
| 'Scrub Last Name' >> beam.ParDo(ScrubLnameFn()))
# The final dictionary will then be written to BigQuery for storage
(clean_vote | 'Write to BQ' >> beam.io.WriteToBigQuery(
table = bq_spec,
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
))
# Run the pipeline
p.run()