Неявная схема для pandas_udf в PySpark? - PullRequest
0 голосов
/ 19 февраля 2019

Этот ответ хорошо объясняет, как использовать groupby и pandas_udf для pyspark для создания пользовательских агрегатов.Однако я не могу объявить свою схему вручную, как показано в этой части примера

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

, поскольку я буду возвращать более 100 столбцов с автоматически генерируемыми именами.Можно ли как-то сказать PySpark просто неосторожно использовать схему, возвращенную моей функцией, и предположить, что она будет одинаковой для всех рабочих узлов?Эта схема также изменится во время прогонов, так как мне придется поиграться с предикторами, которые я хочу использовать, так что автоматизированный процесс для генерации схемы может быть вариантом ...

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

На основании комментария Sanxofons, Я получил представление о том, как реализовать это сам:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

Что я делаю, это получаю образец pandas df, передаю его функции,и посмотрим, что вернется:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

Кажется, это работает для меня.Проблема в том, что он является рекурсивным (нужно определить функцию, чтобы получить схему, иметь схему для определения как udf).Я решил эту проблему, создав UDF-оболочку, которая просто передает кадр данных.

0 голосов
/ 19 февраля 2019

К сожалению, такой опции нет.Схема должна быть известна статически, прежде чем какой-либо компонент будет оценен, поэтому любой вывод формы, основанный на реальных данных, просто отсутствует в таблице.

Если внутренний процесс каким-то образом основан на генерации кода, лучшим вариантом будет интеграциягенерация логики и схемы.Например

def describe(cols, fun):
    schema = StructType([StructField(c, DoubleType()) for c in cols])
    @pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
    def _(df):
        return df[cols].agg([fun])
    return _

df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))

df.groupBy("id").apply(describe(["x"], "mean")).show()                                         
# +---+                                                                           
# |  x|
# +---+
# |3.0|
#+---+


df.groupBy("id").apply(describe(["x", "y"], "mean")).show()                                    
# +---+---+                                                                       
# |  x|  y|
# +---+---+
# |3.0|1.5|
# +---+---+
...