Хочу Я хочу
- Пишите в BigQuery и после этого читайте ту же таблицу из BigQuery, чтобы узнать, есть ли обновления
Другой вопрос с похожей проблемой:
В этом вопросе пользователь хочет запустить два отдельных конвейера, один за другим,Я полагаю, это невозможно, если один из них является потоковым конвейером, не так ли?
В другом вопросе ответ состоял в том, чтобы запустить его в другомтрубопроводы, но у меня такая же проблема.В ответе, который они сказали к тому времени, другого пути не было, а есть ли другое в настоящее время?
Данные о моем конвейере
потоковый конвейер
код, который я хочу переписать:
| 'Write to BQ' >> WriteToBigQuery('table',SCHEMA)
| 'Read from BQ' >> beam.io.Read(beam.io.BigQuerySource('table')))
вывод этого кода:
Traceback (most recent call last):
run()
line 185, in run
| 'Read from BQ' >> beam.io.Read(beam.io.BigQuerySource('table'))
line 111, in __or__
return self.pipeline.apply(ptransform, self)
line 467, in apply
label or transform.label)
line 477, in apply
return self.apply(transform, pvalueish)
line 513, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
line 193, in apply
return m(transform, input)
line 199, in apply_PTransform
return transform.expand(input)
line 850, in expand
assert isinstance(pbegin, pvalue.PBegin)
AssertionError
Какой лучший способ сделать это?