Я получаю сообщение «TypeError: Ожидаемый вызываемый объект вместо: None» при выполнении задания потока данных. Пожалуйста, помогите мне решить эту проблему
Я читаю csvfile данных из локальных файлов и разбиваю файлы на основе значений столбцов и переношу их в отдельные файлы.
class class2(beam.DoFn):
def __init__(self,source_file,outfilelocation):
self.source_file=source_file
self.outfilelocation=outfilelocation
def filespliter(self):
import pandas as pd
import os
df = pd.read_csv(self.source_file)
df['newcolumn'] = pd.to_datetime(df['datetime'], format='%Y-%m-%d-%H.%M.%S.%f').dt.date
df = df.sort_values('newcolumn') # Sorting isn't needed
for key in df['newcolumn'].unique():
file_path = self.outfilelocation +"%s/file_name_%s.csv" % (key, key)
if os.path.exists(file_path):
data_split = df[df['new_insert_date'] == key]
data_split.to_csv(file_path, header=True, index=False)
else:
os.makedirs(os.path.dirname(file_path))
data_split = df[df['new_insert_date'] == key]
data_split.to_csv(file_path, header=True, index=False)
input_file = 'sourcefile.csv'
output_file= 'target_file_directory'
( p
| "spliting_source_file" >> beam.ParDo(class2(input_file,output_file))
)
p.run()