В DirectRunner это работает нормально, но возникают ошибки при переключении на DataflowRunner. В основном мне нужно как-то объединить файлы, которые считываются, но как только я использую beam.combiners.ToList()
для объединения своих данных, это вызывает целый ряд проблем.
Пример кода:
def convert_to_dataframe(readable_file):
yield pd.read_csv(io.TextIOWrapper(readable_file.open()))
class merge_dataframes(beam.DoFn):
def process(self, element):
yield pd.concat(element).reset_index(drop=True)
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Match Files From GCS' >> beam.io.fileio.MatchFiles(raw_data_path)
| 'Read Files' >> beam.io.fileio.ReadMatches()
| 'Shuffle' >> beam.Reshuffle()
| 'Create DataFrames' >> beam.FlatMap(convert_to_dataframe)
| 'Combine To List' >> beam.combiners.ToList()
| 'Merge DataFrames' >> beam.ParDo(merge_dataframes())
| 'Apply Transformations' >> beam.ParDo(ApplyPipeline(creds_path=args.creds_path,
project_name=args.project_name,
feature_group_name=args.feature_group_name
))
| 'Write To GCS' >> beam.io.WriteToText(feature_data_path,
file_name_suffix='.csv',
shard_name_template='')
)
Ошибка:
"No objects to concatenate [while running 'Merge DataFrames']"
Я не понимаю эту ошибку, потому что часть, которая объединяет «Объединить в список», должна была создать список фреймов данных, которые затем были бы переданы на шаг «Слияние фреймов данных», что действительно имеет место, когда я использую DirectRunner.