Контекст: моя компания в Spark 2.2, поэтому невозможно использовать pandas_udf для распределенной обработки столбцов
У меня есть фреймы данных, которые содержат тысячи столбцов (функций) и миллионы записей
df = spark.createDataFrame([(1,"AB", 100, 200,1), (2, "AC", 150,200,2), (3,"AD", 80,150,0)],["Id","Region","Salary", "HouseHoldIncome", "NumChild"])
Я хотел бы выполнить некоторые сводные данные и статистику по каждому столбцу параллельно и задаться вопросом, как лучше всего добиться этого
#The point is any kind of customized summary can exist in my stat1, acting on a Spark dataframe to exploit the distributed processing; of one single column
def stat1(df_1_col):
if (datatype is not numeric):
return "NA"
max_df = df_1_col.groupby().max().collect()
if (max_df >50):
return df_1_col.map(....).reduceByKey(...)
else:
return get_1st_decile(df_1_col)/df_1_col.agg(mean())
Я хотел бы достичь
+-------+------------------+-------------------+--------------------+
|col_name| stat1| stat2| stat3|
+-------+------------------+-------------------+--------------------+
| Id| 10| 10| 10|
|Salary| 4.5| 0.5215336029384192|-0.01309370117407197|
| HouseholdIncome|2.8722813232690143| 0.229328162820653| 0.5756058014772729|
+-------+------------------+-------------------+--------------------+
Вот мои вопросы:
1 / Как я могу добиться этой распределенной обработки без pandas_udf?
2 / В худшем случае мне нужно использовать цикл.
col_list = ["Id","Salary", "HouseHoldIncome", "NumChild"]
for col in col_list:
....#how to call stat1[col] properly and collect to final result
Как мы должны правильно написать это, чтобы получить вышеуказанную форму. Насколько я понимаю, .withColumn()
и udf не могут использоваться здесь, потому что для этого требуется collect_list, чтобы сгладить мой фрейм данных столбца, чтобы перечислить и потерять мощность параллельной обработки Spark DF; не говоря уже о том, что я пробовал _collect_list_ на 10 миллионах записей, а список слишком большой для обработки
.groupBy().agg(stat1_udf(collect_list('data')).alias('data'))
Ссылка здесь
3 / Если нам нужно использовать цикл for, будет ли Spark обрабатывать все столбцы параллельно?
Согласно здесь , цикл for для столбцов может обрабатываться параллельно! Но, насколько я понимаю, это работает, потому что это в ряд и только трансформации участвуют. Таким образом, мы можем сказать, что на шаге цикла, построчные преобразования добавляются только в DAG без каких-либо оценок . Поэтому мы подготовили df_col1-> Transformation (df_col_1), df_col2-> Transformation (df_col_2) и т. Д. В DAG. На шаге действия они будут распространяться мастером Spark и обрабатываться параллельно.
Однако для моего случая это сводка, которая требует уменьшить , сумму , означает или немного собрать и т. Д. поэтому каждый цикл / столбец вынужден выполнять оценку до того, как поступит следующий. Группа обеспечения доступности баз данных не может ждать, но должна выполнить df_col1-> Transformation_and_Action (df_col_1) -> df_col2-> Transformation_and_Action (df_col_2), сделав ее последовательной для тысяч столбцов
Есть ли кто-нибудь?