Я пытаюсь разделить CSV-файл на несколько CSV-файлов на основе значения столбца. Но сейчас я использую приведенный ниже код, но я могу создать файл на основе значения фильтра, но в файлах есть пустые записи. Не могли бы вы, пожалуйста, разобраться в проблеме.
Пример входных данных: введите описание изображения здесь Пример выходных файлов: введите описание изображения здесь введите изображениеописание здесь
Пожалуйста, найдите шаги, которые я выполнил
- Чтение CSV-файла как dict в python3.
- Извлечение ключа из dict и преобразование значений ключав список.
- Итерация по списку и фильтрация словаря по значениям списка
- Запись в отдельные файлы
`
import apache_beam as beam
import csv
import uuid
from apache_beam.options.pipeline_options import PipelineOptions
dict_reader = csv.DictReader(open(input_file,'r'))
insert_date_lst = []
for i in dict_reader:
insert_date_lst.append(i.get("key"))
class TagData(beam.DoFn):
def process(self, element):
key = element.get('key')
yield TaggedOutput(key, element)
with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "dict_read" >> beam.Create(dict_reader)
for i in list(dict.fromkeys(insert_date_lst)):
filter_data = data | "filter"+i >> beam.Filter(lambda x : x['key']==i)
processed_tagged_log = filter_data | "tagged-data-by-key " >> beam.ParDo(TagData()).with_outputs(
*list(dict.fromkeys(insert_date_lst)))
for i in list(dict.fromkeys(insert_date_lst)):
processed_tagged_log[i] | "save file %s" % uuid.uuid4() >> beam.io.WriteToText(output_file + i ,num_shards=0,shard_name_template="",file_name_suffix='.csv')