Я пытаюсь следовать шаблону разработки для медленно изменяющегося кэша поиска (https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1) для потокового конвейера с использованием Python SDK для Apache Beam в DataFlow.
Наша справочная таблица для поискового кэшасидит в BigQuery, и мы можем читать и передавать его как побочный ввод в операцию ParDo, но он не обновляется независимо от того, как мы настроили триггер / окна.
class FilterAlertDoFn(beam.DoFn):
def process(self, element, alertlist):
print len(alertlist)
print alertlist
… # function logic
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
| ‘alert_side_input’ >> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
late=trigger.AfterCount(1)
)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| beam.Map(lambda elem: elem[‘SOMEKEY’])
)
...
main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))
На основе страницы ввода / вывода (https://beam.apache.org/documentation/io/built-in/) он говорит, что Python SDK поддерживает потоковую передачу только для BigQuery Sink, означает ли это, что чтения BQ являются ограниченным источником и, следовательно, не могут быть обновленыв этом методе?
Попытка установить неглобальные окна на источнике приводит к пустой PC-коллекции в боковом вводе.
ОБНОВЛЕНИЕ : При попыткереализовать стратегию, предложенную в ответе Пабло, операция ParDo, использующая боковой ввод, не будет выполняться.
Существует один источник ввода, который идет на два выхода, один из которых затем используетБоковой ввод.Non-SideInput все еще достигнет своего пункта назначения, и конвейер SideInput не будет вводить FilterAlertDoFn ().
Подставляя боковой ввод для фиктивного значения, конвейер войдет в функцию.Возможно, оно ожидает подходящего окна, которого не существует?
С тем же FilterAlertDoFn (), что и выше, мой side_input и call теперь выглядят так:
def refresh_side_input(_):
query = 'select col from table'
client = bigquery.Client(project='gcp-project')
query_job = client.query(query)
return query_job.result()
trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
subscription=known_args.trigger_subscription))
bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
| beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(refresh_side_input)
))
...
# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)
# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])
Я попыталсянесколько разных версий refresh_side_input (), они сообщают об ожидаемом результате при проверке возврата внутри функции.
ОБНОВЛЕНИЕ 2:
Я внес некоторые незначительные измененияк коду Пабло, и я получаю то же поведение - DoFn никогда не выполняется.
В приведенном ниже примере я буду видеть «in_load_conversion_data» всякий раз, когда я публикую в some_other_topic , но никогда не увидит «in_DoFn»при публикации на some_topic
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def load_my_conversion_data():
return {'EURUSD': 1.1, 'USDMXN': 4.4}
def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {'EURUSD': 1.1, 'USDMXN' 20,}
print 'in_load_conversion_data'
return load_my_conversion_data()
class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currency = target_currency
def process(self, elm, rates):
print 'in_DoFn'
elm = elm.attributes
if elm['currency'] == self.target_currency:
yield elm
elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
result = {}.update(elm).update({'currency': self.target_currency,
'value': elm['value']*rate})
yield result
else:
return # We drop that value
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'
with beam.Pipeline(options=pipeline_options) as p:
table_pcv = beam.pvalue.AsSingleton((
p
| 'some_other_topic' >> beam.io.ReadFromPubSub(topic=some_other_topic, with_attributes=True)
| 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))
_ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
| 'some_window' >> beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo('USD'), rates=table_pcv))