Как объединить проанализированные текстовые файлы в Apache -Beam DataFlow в Python? - PullRequest
1 голос
/ 05 февраля 2020

В DirectRunner это работает нормально, но возникают ошибки при переключении на DataflowRunner. В основном мне нужно как-то объединить файлы, которые считываются, но как только я использую beam.combiners.ToList() для объединения своих данных, это вызывает целый ряд проблем.

Пример кода:

def convert_to_dataframe(readable_file):
    yield pd.read_csv(io.TextIOWrapper(readable_file.open()))

class merge_dataframes(beam.DoFn):
    def process(self, element):
        yield pd.concat(element).reset_index(drop=True)

    with beam.Pipeline(options=pipeline_options) as p:

        (p
            | 'Match Files From GCS' >> beam.io.fileio.MatchFiles(raw_data_path)
            | 'Read Files' >> beam.io.fileio.ReadMatches()
            | 'Shuffle' >> beam.Reshuffle()
            | 'Create DataFrames' >> beam.FlatMap(convert_to_dataframe)
            | 'Combine To List' >> beam.combiners.ToList()
            | 'Merge DataFrames' >> beam.ParDo(merge_dataframes())
            | 'Apply Transformations' >> beam.ParDo(ApplyPipeline(creds_path=args.creds_path,
                                                                  project_name=args.project_name,
                                                                  feature_group_name=args.feature_group_name
                                                                  ))
            | 'Write To GCS' >> beam.io.WriteToText(feature_data_path,
                                                    file_name_suffix='.csv',
                                                    shard_name_template='')
         )

Ошибка:

"No objects to concatenate [while running 'Merge DataFrames']" 

Я не понимаю эту ошибку, потому что часть, которая объединяет «Объединить в список», должна была создать список фреймов данных, которые затем были бы переданы на шаг «Слияние фреймов данных», что действительно имеет место, когда я использую DirectRunner.

1 Ответ

1 голос
/ 14 февраля 2020

Принимая во внимание эту ошибку, я подозреваю, что MatchFiles на самом деле не соответствует чему-либо (например, из-за плохого filepattern) и, следовательно, вывод beam.combiners.ToList является пустым списком.

...