Я пытаюсь прочитать файл CSV (с заголовком) из GCS, который содержит около 150 столбцов, а затем
1. Установить данные столбца для определенных столбцов
2. Обновить значение NaN со значениями NULL для всех столбцов
3. Запишите файл csv (с заголовком) в GCS
Вот сложная часть: обработка выполняется в облачном потоке данных, что означает, что для этого мне нужно использовать преобразования лучей Apache.
Я пыталсянесколько способов, таких как skipping_header_lines и использование схемы
Мой код конвейера:
def parse_method(self, line):
reader = csv.reader(line.split('\n'))
for csv_row in reader:
values = [x.decode('utf8') for x in csv_row]
row = []
for value in csv_row:
if value == 'NaN':
value = 'Null'
row.append(value)
return row
(p
| 'Read_from_source' >> beam.io.ReadFromText('gs://{0}/test.csv'.format(BUCKET))
| 'Split' >> beam.Map(lambda s: data_ingestion.parse_method(s))
| 'Write_to_dest' >> beam.io.WriteToText(output_prefix,file_name_suffix='.csv', num_shards=1))
Например: если входные данные моего csv содержат;
имя custom1 custom2
arun undefined Nan
dany losangels временный
ожидаемый csv;
имя custom1 custom2
arun losangels Null
dany losangels временный