Как предоставить параметры для составного преобразования в Apache Beam? - PullRequest
0 голосов
/ 19 декабря 2018

Я использую Python SDK Apache Beam.

У меня есть несколько шагов преобразования, и я хочу сделать их многократно используемыми, что указывает на то, что я должен написать собственное составное преобразование, например:

class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
    result = (pcoll
              | 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
              | 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
              )
    return result

Я хочу предоставить некоторые дополнительные параметры arg1 и kwarg1, которые необходимы для других преобразований внутри.Но я не знаю, является ли это верным способом и как его использовать в конвейере.

Может кто-нибудь указать мне направление?

1 Ответ

0 голосов
/ 27 декабря 2018

Как правило, вы не можете динамически передавать дополнительные параметры для преобразования во время выполнения, как вы описали.Когда вы запускаете программу контроллера, которая создает конвейер, структура конвейера сериализуется, отправляется и затем выполняется параллельно с парком рабочих, которые не имеют доступа к вашей программе контроллера, они только получают структуру и фактическуюкод вашего ParDos.

Один из способов динамической параметризации выполнения - предоставить дополнительные данные в качестве дополнительных входных данных, например, создать еще один PCollection, заполненный значениями параметров, и затем объединить его сглавная PCollection.Например, используя боковые входы или CoGroupByKey.

Если вы смотрите на Cloud Dataflow, то вы можете рассмотреть использование шаблонов pipeс ValueProviders, но не уверен, что они доступны в питонах или не-потоках данных.

...