Распределено для цикла в фрейме данных pyspark - PullRequest
1 голос
/ 23 марта 2019

Контекст: моя компания в Spark 2.2, поэтому невозможно использовать pandas_udf для распределенной обработки столбцов

У меня есть фреймы данных, которые содержат тысячи столбцов (функций) и миллионы записей

df = spark.createDataFrame([(1,"AB", 100, 200,1), (2, "AC", 150,200,2), (3,"AD", 80,150,0)],["Id","Region","Salary", "HouseHoldIncome", "NumChild"])

Я хотел бы выполнить некоторые сводные данные и статистику по каждому столбцу параллельно и задаться вопросом, как лучше всего добиться этого

#The point is any kind of customized summary can exist in my stat1, acting on a Spark dataframe to exploit the distributed processing; of one single column
def stat1(df_1_col):
   if (datatype is not numeric):
      return "NA"
   max_df = df_1_col.groupby().max().collect()
   if (max_df >50):
     return df_1_col.map(....).reduceByKey(...)
   else:
     return get_1st_decile(df_1_col)/df_1_col.agg(mean())

Я хотел бы достичь

+-------+------------------+-------------------+--------------------+
    |col_name|            stat1|       stat2|            stat3|
    +-------+------------------+-------------------+--------------------+
    |  Id|                10|                 10|                  10|
    |Salary|               4.5| 0.5215336029384192|-0.01309370117407197|
    | HouseholdIncome|2.8722813232690143|  0.229328162820653|  0.5756058014772729|
     +-------+------------------+-------------------+--------------------+

Вот мои вопросы:

1 / Как я могу добиться этой распределенной обработки без pandas_udf?

2 / В худшем случае мне нужно использовать цикл.

   col_list = ["Id","Salary", "HouseHoldIncome", "NumChild"]
        for col in col_list:
          ....#how to call stat1[col] properly and collect to final result

Как мы должны правильно написать это, чтобы получить вышеуказанную форму. Насколько я понимаю, .withColumn() и udf не могут использоваться здесь, потому что для этого требуется collect_list, чтобы сгладить мой фрейм данных столбца, чтобы перечислить и потерять мощность параллельной обработки Spark DF; не говоря уже о том, что я пробовал _collect_list_ на 10 миллионах записей, а список слишком большой для обработки

.groupBy().agg(stat1_udf(collect_list('data')).alias('data'))

Ссылка здесь

3 / Если нам нужно использовать цикл for, будет ли Spark обрабатывать все столбцы параллельно? Согласно здесь , цикл for для столбцов может обрабатываться параллельно! Но, насколько я понимаю, это работает, потому что это в ряд и только трансформации участвуют. Таким образом, мы можем сказать, что на шаге цикла, построчные преобразования добавляются только в DAG без каких-либо оценок . Поэтому мы подготовили df_col1-> Transformation (df_col_1), df_col2-> Transformation (df_col_2) и т. Д. В DAG. На шаге действия они будут распространяться мастером Spark и обрабатываться параллельно.

Однако для моего случая это сводка, которая требует уменьшить , сумму , означает или немного собрать и т. Д. поэтому каждый цикл / столбец вынужден выполнять оценку до того, как поступит следующий. Группа обеспечения доступности баз данных не может ждать, но должна выполнить df_col1-> Transformation_and_Action (df_col_1) -> df_col2-> Transformation_and_Action (df_col_2), сделав ее последовательной для тысяч столбцов

Есть ли кто-нибудь?

...