Очистка данных в файлах CSV с использованием потока данных - PullRequest
0 голосов
/ 13 апреля 2019

Я пытаюсь прочитать файл CSV (с заголовком) из GCS, который содержит около 150 столбцов, а затем
1. Установить данные столбца для определенных столбцов
2. Обновить значение NaN со значениями NULL для всех столбцов
3. Запишите файл csv (с заголовком) в GCS

Вот сложная часть: обработка выполняется в облачном потоке данных, что означает, что для этого мне нужно использовать преобразования лучей Apache.
Я пыталсянесколько способов, таких как skipping_header_lines и использование схемы

Мой код конвейера:


def parse_method(self, line):    
    reader = csv.reader(line.split('\n'))
    for csv_row in reader:
        values = [x.decode('utf8') for x in csv_row]
        row = []
        for value in csv_row:
            if value == 'NaN':
                value = 'Null'
            row.append(value)
    return row

(p
    | 'Read_from_source' >> beam.io.ReadFromText('gs://{0}/test.csv'.format(BUCKET))
    | 'Split' >> beam.Map(lambda s: data_ingestion.parse_method(s))
    | 'Write_to_dest' >> beam.io.WriteToText(output_prefix,file_name_suffix='.csv', num_shards=1))

Например: если входные данные моего csv содержат;

имя custom1 custom2
arun undefined Nan
dany losangels временный

ожидаемый csv;
имя custom1 custom2
arun losangels Null
dany losangels временный

1 Ответ

1 голос
/ 17 апреля 2019

При использовании следующего получается результат, который вы ищете:

    lines = p | ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")

    def parse_method(line):
        import csv
        reader = csv.reader(line.split('\n'))
        for csv_row in reader:
            values = [x.decode('utf8') for x in csv_row]
            row = []
            for value in csv_row:
                if value == 'NaN':
                    value = 'Null'
                row.append(value)

        return ",".join(row)



    lines = lines | 'Split' >> beam.Map(parse_method)
    line = lines | 'Output to file' >> WriteToText(file_pattern="gs://<my-bucket>/output_file.csv")

Теперь для редактирования столбцов, основанных на заголовке, я не уверен, есть ли какой-то более простой способ, но я бы использовал панд следующим образом:

    lines = p | "ReadFromText" >> ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")

    def parse_method(line):
        import pandas as pd

        line = line.split(',')
        df = pd.DataFrame(data=[line],columns=['name','custom1','custom2'])
        df['custom2'] = df.custom2.apply(lambda x: 'None' if x == 'Nan' else x)
        values = list(df.loc[0].values)
        return ",".join(values)

    lines = lines | "Split" >> beam.Map(parse_method)
    line = lines | "Output to file" >> WriteToText(file_path_prefix="gs://<my-bucket>/output_file.csv")
...