Медленно меняющийся кеш поиска из BigQuery - поток данных Python Streaming SDK - PullRequest
1 голос
/ 08 марта 2019

Я пытаюсь следовать шаблону разработки для медленно изменяющегося кэша поиска (https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1) для потокового конвейера с использованием Python SDK для Apache Beam в DataFlow.

Наша справочная таблица для поискового кэшасидит в BigQuery, и мы можем читать и передавать его как побочный ввод в операцию ParDo, но он не обновляется независимо от того, как мы настроили триггер / окна.

class FilterAlertDoFn(beam.DoFn):
  def process(self, element, alertlist):

    print len(alertlist)
    print alertlist

    …  # function logic

alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
                        | ‘alert_side_input’ >> beam.WindowInto(
                            beam.window.GlobalWindows(),
                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
                                late=trigger.AfterCount(1)
                            )),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                          )
                       | beam.Map(lambda elem: elem[‘SOMEKEY’])
)

...


main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))

На основе страницы ввода / вывода (https://beam.apache.org/documentation/io/built-in/) он говорит, что Python SDK поддерживает потоковую передачу только для BigQuery Sink, означает ли это, что чтения BQ являются ограниченным источником и, следовательно, не могут быть обновленыв этом методе?

Попытка установить неглобальные окна на источнике приводит к пустой PC-коллекции в боковом вводе.


ОБНОВЛЕНИЕ : При попыткереализовать стратегию, предложенную в ответе Пабло, операция ParDo, использующая боковой ввод, не будет выполняться.

Существует один источник ввода, который идет на два выхода, один из которых затем используетБоковой ввод.Non-SideInput все еще достигнет своего пункта назначения, и конвейер SideInput не будет вводить FilterAlertDoFn ().

Подставляя боковой ввод для фиктивного значения, конвейер войдет в функцию.Возможно, оно ожидает подходящего окна, которого не существует?

С тем же FilterAlertDoFn (), что и выше, мой side_input и call теперь выглядят так:

def refresh_side_input(_):
   query = 'select col from table'
   client = bigquery.Client(project='gcp-project')
   query_job = client.query(query)

   return query_job.result()


trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
            subscription=known_args.trigger_subscription))


bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
         | beam.WindowInto(beam.window.GlobalWindows(),
                           trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
         | beam.Map(refresh_side_input)
        ))

...

# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)

# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])

Я попыталсянесколько разных версий refresh_side_input (), они сообщают об ожидаемом результате при проверке возврата внутри функции.


ОБНОВЛЕНИЕ 2:

Я внес некоторые незначительные измененияк коду Пабло, и я получаю то же поведение - DoFn никогда не выполняется.

В приведенном ниже примере я буду видеть «in_load_conversion_data» всякий раз, когда я публикую в some_other_topic , но никогда не увидит «in_DoFn»при публикации на some_topic

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def load_my_conversion_data():
    return {'EURUSD': 1.1, 'USDMXN': 4.4}


def load_conversion_data(_):
    # I will suppose that these are currency conversions. E.g.
    # {'EURUSD': 1.1, 'USDMXN' 20,}
    print 'in_load_conversion_data'
    return load_my_conversion_data()


class ConvertTo(beam.DoFn):
    def __init__(self, target_currency):
        self.target_currency = target_currency

    def process(self, elm, rates):
        print 'in_DoFn'
        elm = elm.attributes
        if elm['currency'] == self.target_currency:
            yield elm
        elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
            rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
            result = {}.update(elm).update({'currency': self.target_currency,
            'value': elm['value']*rate})
             yield result
         else:
             return  # We drop that value


pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'

with beam.Pipeline(options=pipeline_options) as p:

    table_pcv = beam.pvalue.AsSingleton((
      p
      | 'some_other_topic' >>  beam.io.ReadFromPubSub(topic=some_other_topic,  with_attributes=True)
      | 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
                        trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                        accumulation_mode=trigger.AccumulationMode.DISCARDING)
      | beam.Map(load_conversion_data)))


    _ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
         | 'some_window' >> beam.WindowInto(window.FixedWindows(1))
         | beam.ParDo(ConvertTo('USD'), rates=table_pcv))

1 Ответ

1 голос
/ 13 марта 2019

Как вы хорошо заметили, Java SDK позволяет вам использовать больше потоковых утилит, таких как таймеры и состояния. Эти утилиты помогают реализовать такие конвейеры.

В Python SDK отсутствуют некоторые из этих утилит, а именно таймеры. По этой причине нам нужно использовать хак, где перезагрузка бокового ввода может быть инициирована путем вставки сообщений в наш some_other_topic в PubSub.

Это также означает, что вы должны вручную выполнить поиск в BigQuery. Вероятно, вы можете использовать класс apache_beam.io.gcp.bigquery_tools.BigQueryWrapper для выполнения поиска непосредственно в BigQuery.

Вот пример конвейера, который обновляет некоторые данные конвертации валюты. Я не проверял это, но я на 90% уверен, что он будет работать только с несколькими настройками. Дайте мне знать, если это поможет.

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

def load_conversion_data(_):
  # I will suppose that these are currency conversions. E.g. 
  # {‘EURUSD’: 1.1, ‘USDMXN’ 20, …}
  return external_service.load_my_conversion_data()

table_pcv = beam.pvalue.AsSingleton((
  p
  | beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
  | WindowInto(window.GlobalWindow(),
               trigger=trigger.Repeatedly(trigger.AfterCount(1),
               accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | beam.Map(load_conversion_data)))


class ConvertTo(beam.DoFn):
  def __init__(self, target_currency):
    self.target_currenct = target_currency

  def process(self, elm, rates):
    if elm[‘currency’] == self.target_currency:
      yield elm
    elif ‘%s%s’ % (elm[‘currency’], self.target_currency) in rates:
      rate = rates[‘%s%s’ % (elm[‘currency’], self.target_currency)]
      result = {}.update(elm).update({‘currency’: self.target_currency,
                                      ‘value’: elm[‘value’]*rate})
      yield result
    else:
      return  # We drop that value


_ = (p 
     | beam.io.gcp.ReadFromPubSub(topic=some_topic)
     | beam.WindowInto(window.FixedWindows(1))
     | beam.ParDo(ConvertTo(‘USD’), rates=table_pcv))
...