Python / Apache -Beam: Как разобрать текстовый файл в CSV? - PullRequest
0 голосов
/ 31 января 2020

Я все еще новичок в Beam, но как именно вы читаете из файлов CSV, которые находятся в GCS Buckets? По сути, мне нужно преобразовать эти файлы в pandas фрейм данных, используя Beam, а затем применить модель sklearn для «обучения» этих данных. Большинство примеров, которые я видел, предопределяют заголовок, я хочу, чтобы этот конвейер Beam распространялся на любые файлы, где заголовки определенно будут другими. Есть библиотека с именем beam_utils , которая делает то, что я хочу, но затем я сталкиваюсь с этой ошибкой: AttributeError: module 'apache_beam.io.fileio' has no attribute 'CompressionTypes'

Пример кода:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# The error occurs in this import
from beam_utils.sources import CsvFileSource

options = {
    'project': 'my-project',
    'runner:': 'DirectRunner',
    'streaming': False
}

pipeline_options = PipelineOptions(flags=[], **options)

class Printer(beam.DoFn):
    def process(self, element):
        print(element)

with beam.Pipeline(options=pipeline_options) as p:  # Create the Pipeline with the specified options.

    data = (p
            | 'Read File From GCS' >> beam.io.textio.ReadFromText('gs://my-csv-files')
            )

    _ = (data | "Print the data" >> beam.ParDo(Printer()))

result = p.run()
result.wait_until_finish()

1 Ответ

1 голос
/ 07 февраля 2020

Модуль Apache Beam fileio был недавно изменен с учетом несовместимых изменений, а библиотека beam_utils еще не обновлена.

Я прошел вопрос предложенный @Pablo и исходным кодом beam_utils (также написанным Pablo) для репликации поведения с использованием модуля filesystems.

Ниже приведены две версии кода с использованием pandas для генерации DataFrame (s).

CSV, используемый для примера:

a,b
1,2
3,4
5,6

Чтение CSV и создание DataFrame со всеми его содержимое

import apache_beam as beam
import pandas as pd
import csv
import io

def create_dataframe(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
    csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    # Create the DataFrame
    dataFrame = pd.DataFrame(csv_dict)
    print(dataFrame.to_string())

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
   | beam.FlatMap(create_dataframe)
)

p.run()

Результирующий DataFrame

   a  b
0  1  2
1  3  4
2  5  6

Чтение CSV и создание DataFrames в другом преобразовании

def get_csv_reader(readable_file):

    # Open a channel to read the file from GCS
    gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Return the csv reader
    return  csv.DictReader(io.TextIOWrapper(gcs_file))

p = beam.Pipeline()
(p | beam.Create(['gs://my-bucket/my-file.csv'])
   | beam.FlatMap(get_csv_reader)
   | beam.Map(lambda x: pd.DataFrame([x])) # Create the DataFrame from each csv row
   | beam.Map(lambda x: print(x.to_string()))
)

Результирующие кадры данных

   a  b
0  1  2
   a  b
0  3  4
   a  b
0  5  6
...