Хит проблемы с помощью GroupByKey, и я считаю, что это отследить до проблемы типа. Я смотрел на это некоторое время и следил за несколькими следами стека, но мне не ясно, почему следующее неверно
@beam.typehints.with_output_types(beam.typehints.Tuple[long, float])
class MultiMap(beam.DoFn):
def process(self, element):
items = element.split(',')
print items
r = (long(items[0]), float(items[10]))
print r
return r
pipeline = beam.Pipeline()
pcoll = pipeline | 'start' >> beam.Create(['14172425165068797305,3,0,3,0.07,0.36,1,4,4,3705.00765154,0.235002550513','2746375035268210383,3,0,3,0.07,0.36,2,5,5,3789.1391067,0.263046368899','16101396351712676789,3,0,3,0.07,0.37,1,4,3,3639.26112282,0.213087040939'])
multi = pcoll | "Multimap" >> beam.ParDo(MultiMap()).with_output_types(beam.typehints.Tuple[long, float])
Используя DirectRunner, я получаю следующее исключение.
File "apache_beam/runners/worker/operations.py", line 227, in apache_beam.runners.worker.operations.ReadOperation.start File "apache_beam/runners/worker/operations.py", line 228, in apache_beam.runners.worker.operations.ReadOperation.start File "apache_beam/runners/worker/operations.py", line 229, in apache_beam.runners.worker.operations.ReadOperation.start File "apache_beam/runners/worker/operations.py", line 238, in apache_beam.runners.worker.operations.ReadOperation.start File "apache_beam/runners/worker/operations.py", line 159, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 85, in apache_beam.runners.worker.operations.ConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 392, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 393, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 488, in apache_beam.runners.common.DoFnRunner.receive File "apache_beam/runners/common.py", line 496, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 537, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python2.7/dist-packages/six.py", line 737, in raise_from
raise value TypeError: 'long' object is not subscriptable [while running 'Multimap']
Необходимо выяснить, почему не удается передать выходные данные ParDo в GroupByKey.