Использование вывода ToList в качестве входных данных для AsSingleton или AsList в Apache Beam (python) - PullRequest
4 голосов
/ 14 апреля 2019

Я получаю неожиданную ошибку, когда пытаюсь использовать выходные данные beam.combiners.ToList в качестве входных данных для beam.pvalue.AsSingleton или beam.pvalue.AsList для экспериментов с боковыми входами. Я мог использовать отдельные числа (например, среднее значение списка) в качестве побочного ввода, но для списков и словарей я получаю исключения. Для луча.pvalue.AsSingleton, я получаю:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

Для beam.pvalue.AsList, я получаю:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-7-0c1df7400a03> in <module>
     15 chain_total = chain_1 | chain_2
     16 
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
     18 
     19 chain_total | beam.Map(print)

~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
    297     self.pvalue = pcoll
    298     self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299         pcoll.windowing.windowfn)
    300 
    301   def _view_options(self):

AttributeError: '_ChainedPTransform' object has no attribute 'windowing'

Это код, который я использую

import apache_beam as beam


def m(x, u):
    print(u)
    return x


p = beam.Pipeline()

data_beam = Create(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'])

chain_1 = p | data_beam | beam.combiners.Count.PerElement()
chain_2 = beam.Map(lambda x: x[0]) | beam.combiners.ToList()

chain_total = chain_1 | chain_2

chain_1 | beam.Map(m, beam.pvalue.AsSingleton(chain_2))

chain_total | beam.Map(print)

p.run()

Замените beam.pvalue.AsSingleton на beam.pvalue.AsList, чтобы получить другую ошибку. Я использую Apache Beam python SDK версии 2.11.0.

1 Ответ

1 голос
/ 17 апреля 2019

PCollections - существительные в Beam, а PTransforms - глаголы. Когда вы начинаете конвейер, p = beam.Pipeline() - ваше единственное существительное. (Нет. Даже Create - это глагол.)

Применяя различные глаголы к этому существительному, вы можете создавать другие существительные в соответствии со следующим правилом:

  • new_noun = existing_noun | verb

Кажется, что основной источник путаницы возникает из-за того, что вы можете соединять глаголы вместе:

  • fancy_verb = verb1 | verb2

Хотя синтаксис в этих примерах выглядит примерно одинаково, возвращаемые значения имеют разные типы.

Основная проблема заключается в том, что только существительные могут рассматриваться как SideInputs.

В предоставленном примере chain_2 - это глагол, созданный путем объединения двух глаголов, и сообщение об ошибке подтверждает, что _ChainedPTransform (действительно, вид PTransform , как следует из названия) не может быть передано к любой из функций AsSideInput.

...