Как мне сделать pyspark udf с необязательной переменной и обязательным вводом - PullRequest
1 голос
/ 09 мая 2019

У меня есть файл python egg, и я хочу использовать его в качестве pyspark udf для применения к файлу данных spark

class MY_TEST(object):
    def __init__(self, df_in):
        self.df = df_in

    def TEST_FUNCTION(self, type, col1=None, col2=None, col3=None):
        if col1:
            col1 = col1
        else:
            col1 = self.df['col1'].get_values()[0]

        if col2:
            col2 = col2
        else:
            col2 = self.df['col2'].get_values()[0]

        if col3:
            col3 = col3
        else:
            col3 = self.df['col3'].get_values()[0]

        if type=='add':
            final = col1 + col2 + col3
        else:
            final = col1 * col2 * col3


        return final

когда я бегу на своей машине:

импорт панд как pd

df_in = pd.DataFrame()
df_in['col1'] = [1]
df_in['col2'] = [2]
df_in['col3'] = [5]

df_in['col1'] = df_in['col1'].astype(float)
df_in['col2'] = df_in['col2'].astype(float)
df_in['col3'] = df_in['col3'].astype(float)

MY_TEST_init = MY_TEST(df_in)
print('add                 ', MY_TEST_init.TEST_FUNCTION('add'))
print('add Col1 manipulate ', MY_TEST_init.TEST_FUNCTION('add', col1=5))
print('mult                ', MY_TEST_init.TEST_FUNCTION('mult'))

Результаты такие (как и ожидалось):

add                  8.0
add Col1 manipulate  12.0
mult                 10.0

Однако, когда я попытался поместить его в pyspark

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import size, lit
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
from StringIO import StringIO
import pandas as pd

spark.sparkContext.addPyFile("<PATH>-MY_TEST.egg")
import MY_TEST as MY_TEST

df_in = pd.DataFrame()
df_in['col1'] = [1]
df_in['col2'] = [2]
df_in['col3'] = [5]

df_in['col1'] = df_in['col1'].astype(float)
df_in['col2'] = df_in['col2'].astype(float)
df_in['col3'] = df_in['col3'].astype(float)

sqlCtx = SQLContext(sc)
df_spark = spark.createDataFrame(df_in)
df_spark = df_spark.withColumn("test", lit('add'))

MY_TEST_init = MY_TEST.MY_TEST(df_in)

TEST_FUNCTION_udf = udf(MY_TEST_init.TEST_FUNCTION, FloatType())
result = df_spark.withColumn('result', TEST_FUNCTION_udf(df_spark['test']))

Я ожидал результата, но вместо этого получил ошибку:

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 10 на этапе 810.0 4 раза, последний сбой: сброшено задание 10.3 на этапе 810.0 (TID 66758, 172.60.250.7, исполнитель 47): org .apache.spark.api.python.PythonException: трассировка (последний вызов был последним):

...