Текущая ситуация
Типичным объектом этого конвейера является чтение из pub / sub полезной нагрузки с геоданными, затем эти данные преобразуются, анализируются и, наконец, возвращаются, если условие истинно или ложно
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Желаемая ситуация 1
Если результат GeoDataIngestion равен true, тогда raw_data будет сохранен в большом запросе
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'Format' >> beam.ParDo(FormatBigQueryFn())
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'XXX',
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
Желаемая ситуация 2
Вместо того, чтобы хранить данные в BigQuery, было бы также хорошо отправить в pub / sub
def Condition(condition):
if condition:
<...SendToPubSub(Topic1)...>
else:
<...SendToPubSub(Topic2)...>
Здесь проблема состоит в том, чтобы установить тему в зависимости от результата условия, потому что я не могу передать тему как параметр в конвейере
| beam.io.WriteStringsToPubSub(TOPIC)
Ни в одной функции / классе
Вопрос
Как я могу это сделать?
Как / где я должен вызвать WriteToBigQuery для хранения необработанных данных PCollection, если результат условия Evaluate равен true?