Модуль Apache Beam fileio
был недавно изменен с учетом несовместимых изменений, а библиотека beam_utils
еще не обновлена.
Я прошел вопрос предложенный @Pablo и исходным кодом beam_utils
(также написанным Pablo) для репликации поведения с использованием модуля filesystems
.
Ниже приведены две версии кода с использованием pandas для генерации DataFrame (s).
CSV, используемый для примера:
a,b
1,2
3,4
5,6
Чтение CSV и создание DataFrame со всеми его содержимое
import apache_beam as beam
import pandas as pd
import csv
import io
def create_dataframe(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read it as csv, you can also use csv.reader
csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))
# Create the DataFrame
dataFrame = pd.DataFrame(csv_dict)
print(dataFrame.to_string())
p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
| beam.FlatMap(create_dataframe)
)
p.run()
Результирующий DataFrame
a b
0 1 2
1 3 4
2 5 6
Чтение CSV и создание DataFrames в другом преобразовании
def get_csv_reader(readable_file):
# Open a channel to read the file from GCS
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Return the csv reader
return csv.DictReader(io.TextIOWrapper(gcs_file))
p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
| beam.FlatMap(get_csv_reader)
| beam.Map(lambda x: pd.DataFrame([x])) # Create the DataFrame from each csv row
| beam.Map(lambda x: print(x.to_string()))
)
Результирующие кадры данных
a b
0 1 2
a b
0 3 4
a b
0 5 6