Я получаю неожиданную ошибку, когда пытаюсь использовать выходные данные beam.combiners.ToList в качестве входных данных для beam.pvalue.AsSingleton или beam.pvalue.AsList для экспериментов с боковыми входами. Я мог использовать отдельные числа (например, среднее значение списка) в качестве побочного ввода, но для списков и словарей я получаю исключения. Для луча.pvalue.AsSingleton, я получаю:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-4-0c1df7400a03> in <module>
15 chain_total = chain_1 | chain_2
16
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
18
19 chain_total | beam.Map(print)
~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
297 self.pvalue = pcoll
298 self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299 pcoll.windowing.windowfn)
300
301 def _view_options(self):
AttributeError: '_ChainedPTransform' object has no attribute 'windowing'
Для beam.pvalue.AsList, я получаю:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-7-0c1df7400a03> in <module>
15 chain_total = chain_1 | chain_2
16
---> 17 chain_1 | beam.Map(m, beam.pvalue.AsList(chain_2))
18
19 chain_total | beam.Map(print)
~/.cache/pypoetry/virtualenvs/prototyping-with-tensorflow-py3.6/lib/python3.6/site-packages/apache_beam/pvalue.py in __init__(self, pcoll)
297 self.pvalue = pcoll
298 self._window_mapping_fn = sideinputs.default_window_mapping_fn(
--> 299 pcoll.windowing.windowfn)
300
301 def _view_options(self):
AttributeError: '_ChainedPTransform' object has no attribute 'windowing'
Это код, который я использую
import apache_beam as beam
def m(x, u):
print(u)
return x
p = beam.Pipeline()
data_beam = Create(['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c'])
chain_1 = p | data_beam | beam.combiners.Count.PerElement()
chain_2 = beam.Map(lambda x: x[0]) | beam.combiners.ToList()
chain_total = chain_1 | chain_2
chain_1 | beam.Map(m, beam.pvalue.AsSingleton(chain_2))
chain_total | beam.Map(print)
p.run()
Замените beam.pvalue.AsSingleton на beam.pvalue.AsList, чтобы получить другую ошибку. Я использую Apache Beam python SDK версии 2.11.0.