Я пытаюсь оценить (предсказать вероятности) данные, используя протравленную модель XGBoost, используя PandasUDF. Мой код содержится в конструкции класса. Я получаю следующее сообщение об ошибке и не знаю, как это исправить. Не было ошибок при этом без занятий.
Ошибка:
"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Код:
import findspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SQLContext
import os
findspark.init('/opt/cloudera/parcels/SPARK2/lib/spark2')
os.environ['JAVA_HOME'] = "/usr/java/jdk1.8.0_151-cloudera"
os.environ['PYSPARK_PYTHON'] = "/opt/anaconda3/bin/python"
sc = SparkContext()
sqlContext = SQLContext(sc)
hc = HiveContext(sc)
conf.set("spark.sql.execution.arrow.enabled", "true")
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
import pandas as pd
import numpy as np
import datetime
import pickle
import warnings
warnings.filterwarnings('ignore')
class Score:
def __init__(self, df, model, feat):
self.df = df
self.model = model
self.feat = feat
def calculateProbability(self):
self.df.registerTempTable('df')
self.df = hc.sql("""
SELECT *, row_id % 10 AS partition_id
FROM (SELECT *, row_number() over (ORDER BY rand()) AS row_id FROM df)
""")
schema = StructType([StructField('X', IntegerType(), True),
StructField('Y', IntegerType(), True),
StructField('Z', IntegerType(), True),
StructField('SCORE', DoubleType(), True)])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def scoreData(df):
prediction = self.model.predict_proba(df[self.feat], ntree_limit = model1.best_ntree_limit)[:,1]
col1 = df['X']
col2 = df['Y']
col3 = df['X']
col4 = prediction
return pd.DataFrame({'X': col1, 'Y': col2, 'X': col3, 'SCORE': col4})
self.df_scored = self.df.groupby('partition_id').apply(scoreData)
model
представляет собой файл рассылки, а feat
- список характеристик модели.
Пожалуйста, укажите, где я иду не так и как это исправить. Спасибо.