У меня есть файл python egg, и я хочу использовать его в качестве pyspark udf для применения к файлу данных spark
class MY_TEST(object):
def __init__(self, df_in):
self.df = df_in
def TEST_FUNCTION(self, type, col1=None, col2=None, col3=None):
if col1:
col1 = col1
else:
col1 = self.df['col1'].get_values()[0]
if col2:
col2 = col2
else:
col2 = self.df['col2'].get_values()[0]
if col3:
col3 = col3
else:
col3 = self.df['col3'].get_values()[0]
if type=='add':
final = col1 + col2 + col3
else:
final = col1 * col2 * col3
return final
когда я бегу на своей машине:
импорт панд как pd
df_in = pd.DataFrame()
df_in['col1'] = [1]
df_in['col2'] = [2]
df_in['col3'] = [5]
df_in['col1'] = df_in['col1'].astype(float)
df_in['col2'] = df_in['col2'].astype(float)
df_in['col3'] = df_in['col3'].astype(float)
MY_TEST_init = MY_TEST(df_in)
print('add ', MY_TEST_init.TEST_FUNCTION('add'))
print('add Col1 manipulate ', MY_TEST_init.TEST_FUNCTION('add', col1=5))
print('mult ', MY_TEST_init.TEST_FUNCTION('mult'))
Результаты такие (как и ожидалось):
add 8.0
add Col1 manipulate 12.0
mult 10.0
Однако, когда я попытался поместить его в pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import size, lit
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
from StringIO import StringIO
import pandas as pd
spark.sparkContext.addPyFile("<PATH>-MY_TEST.egg")
import MY_TEST as MY_TEST
df_in = pd.DataFrame()
df_in['col1'] = [1]
df_in['col2'] = [2]
df_in['col3'] = [5]
df_in['col1'] = df_in['col1'].astype(float)
df_in['col2'] = df_in['col2'].astype(float)
df_in['col3'] = df_in['col3'].astype(float)
sqlCtx = SQLContext(sc)
df_spark = spark.createDataFrame(df_in)
df_spark = df_spark.withColumn("test", lit('add'))
MY_TEST_init = MY_TEST.MY_TEST(df_in)
TEST_FUNCTION_udf = udf(MY_TEST_init.TEST_FUNCTION, FloatType())
result = df_spark.withColumn('result', TEST_FUNCTION_udf(df_spark['test']))
Я ожидал результата, но вместо этого получил ошибку:
org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 10 на этапе 810.0 4 раза, последний сбой: сброшено задание 10.3 на этапе 810.0 (TID 66758, 172.60.250.7, исполнитель 47): org .apache.spark.api.python.PythonException: трассировка (последний вызов был последним):