Разделите фрейм данных Pyspark на подмножества, примените функцию и запишите выходные данные в несколько файлов - PullRequest
0 голосов
/ 18 февраля 2020

У меня есть текстовый файл фиксированной ширины, содержащий несколько «таблиц».

table1  dataneedsseparation154
table1  heresplitbadlyneedd432
table2  it'salwaysdifferent
...

Я хочу прочитать его и обработать с помощью pyspark. Каждая таблица имеет свою собственную схему, поэтому я хотел бы получить что-то вроде этого, используя схемы, подобные этой:

table1: col1 char(4)
        col2 char(5)
        col3 char(10)
        col4 int(3)
table2: col1 char(19)
table1:
id|col1|col2 |col3      |col4
-------|-----|----------|-----
 1|data|needs|separation|154
 2|here|split|badlyneedd|432

table2:
id|col1
------------------
1 |it'salwaysdifferent

Это означает, что я хочу как-то разделить / сгруппировать по первому столбцу, применить Схема, а затем записать в отдельные файлы. То, что я сделал, использует это понимание списка, но это, конечно, не распараллелено.

tables_list = [(table_name,apply_schema(df.filter(df['table']==table_name),table_name) for table_name in tables]
[table.write.format('parquet').save(f'PATH/{table_name}.parquet') for table_name,table in tables_list]

Как я могу обработать это за один go с файлами, записываемыми параллельно?

Я также подумал об использовании функции partitionBy в dataframe, но потом я не смог Не могу понять, возможно ли применить функцию apply_schema перед записью.

Может ли UDAF или window_function иметь дело с распределенными записями?

--- редактировать:

минимальный пример:

df = spark.createDataFrame(
    [
        ("table1", "dataneedsseparation154"),
        ("table1", "heresplitbadlyneedd432"),
        ("table2", "it'salwaysdifferent"),
    ],
    ('table', 'raw_string')
)
schema = {'table1':{'col1':(0,4),'col2':(5,5),'col3':(10,10),'col4':(20,3)},'table2':{'col1':(0,19)}}

def apply_schema(df,table_name):
  for column,(start,length) in schema[table_name].items():
    df = df.withColumn(column,df['raw_string'].substr(start,length))
  df = df.drop('raw_string')
  return df

result = [apply_schema(df.filter(df['table']==table),table) for table in schema.keys()]

Желаемый результат:

spark.createDataFrame(
    [
        ("table1", "data","needs","separation","154"),
        ("table1", "here","split","badlyneedd","432")
    ],
    ('table', 'col1','col2','col3','col4')
).write.format('parquet').save('table1.parquet')
spark.createDataFrame(
    [
        ("table2", "it'salwaysdifferent"),
    ],
    ('table', 'col1')
).write.format('parquet').save('table2.parquet')

Вопрос в том, как получить результат Перечислите файлы в паркет (параллельным способом), или если описанный выше способ вообще является правильным способом для получения преобразованных файлов паркета, распараллеленных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...