У меня есть текстовый файл фиксированной ширины, содержащий несколько «таблиц».
table1 dataneedsseparation154
table1 heresplitbadlyneedd432
table2 it'salwaysdifferent
...
Я хочу прочитать его и обработать с помощью pyspark. Каждая таблица имеет свою собственную схему, поэтому я хотел бы получить что-то вроде этого, используя схемы, подобные этой:
table1: col1 char(4)
col2 char(5)
col3 char(10)
col4 int(3)
table2: col1 char(19)
table1:
id|col1|col2 |col3 |col4
-------|-----|----------|-----
1|data|needs|separation|154
2|here|split|badlyneedd|432
table2:
id|col1
------------------
1 |it'salwaysdifferent
Это означает, что я хочу как-то разделить / сгруппировать по первому столбцу, применить Схема, а затем записать в отдельные файлы. То, что я сделал, использует это понимание списка, но это, конечно, не распараллелено.
tables_list = [(table_name,apply_schema(df.filter(df['table']==table_name),table_name) for table_name in tables]
[table.write.format('parquet').save(f'PATH/{table_name}.parquet') for table_name,table in tables_list]
Как я могу обработать это за один go с файлами, записываемыми параллельно?
Я также подумал об использовании функции partitionBy в dataframe, но потом я не смог Не могу понять, возможно ли применить функцию apply_schema
перед записью.
Может ли UDAF или window_function иметь дело с распределенными записями?
--- редактировать:
минимальный пример:
df = spark.createDataFrame(
[
("table1", "dataneedsseparation154"),
("table1", "heresplitbadlyneedd432"),
("table2", "it'salwaysdifferent"),
],
('table', 'raw_string')
)
schema = {'table1':{'col1':(0,4),'col2':(5,5),'col3':(10,10),'col4':(20,3)},'table2':{'col1':(0,19)}}
def apply_schema(df,table_name):
for column,(start,length) in schema[table_name].items():
df = df.withColumn(column,df['raw_string'].substr(start,length))
df = df.drop('raw_string')
return df
result = [apply_schema(df.filter(df['table']==table),table) for table in schema.keys()]
Желаемый результат:
spark.createDataFrame(
[
("table1", "data","needs","separation","154"),
("table1", "here","split","badlyneedd","432")
],
('table', 'col1','col2','col3','col4')
).write.format('parquet').save('table1.parquet')
spark.createDataFrame(
[
("table2", "it'salwaysdifferent"),
],
('table', 'col1')
).write.format('parquet').save('table2.parquet')
Вопрос в том, как получить результат Перечислите файлы в паркет (параллельным способом), или если описанный выше способ вообще является правильным способом для получения преобразованных файлов паркета, распараллеленных.