PySpark: Попытка сослаться на SparkContext из трансляции - PullRequest
0 голосов
/ 11 апреля 2020

Я пытаюсь оценить (предсказать вероятности) данные, используя протравленную модель XGBoost, используя PandasUDF. Мой код содержится в конструкции класса. Я получаю следующее сообщение об ошибке и не знаю, как это исправить. Не было ошибок при этом без занятий.

Ошибка:

"It appears that you are attempting to reference SparkContext from a broadcast "
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Код:

import findspark
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext, SQLContext
import os

findspark.init('/opt/cloudera/parcels/SPARK2/lib/spark2')
os.environ['JAVA_HOME'] = "/usr/java/jdk1.8.0_151-cloudera"
os.environ['PYSPARK_PYTHON'] = "/opt/anaconda3/bin/python"
sc = SparkContext()

sqlContext = SQLContext(sc)
hc = HiveContext(sc)
conf.set("spark.sql.execution.arrow.enabled", "true")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *

import pandas as pd 
import numpy as np
import datetime
import pickle
import warnings
warnings.filterwarnings('ignore')

class Score:

    def __init__(self, df, model, feat):
        self.df = df
        self.model = model
        self.feat = feat

    def calculateProbability(self):
        self.df.registerTempTable('df')
        self.df = hc.sql("""
        SELECT *, row_id % 10 AS partition_id
        FROM (SELECT *, row_number() over (ORDER BY rand()) AS row_id FROM df)
        """)

        schema = StructType([StructField('X', IntegerType(), True),
                     StructField('Y', IntegerType(), True),
                     StructField('Z', IntegerType(), True),
                     StructField('SCORE', DoubleType(), True)])

        @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
        def scoreData(df):
            prediction = self.model.predict_proba(df[self.feat], ntree_limit = model1.best_ntree_limit)[:,1]
            col1 = df['X']
            col2 = df['Y']
            col3 = df['X']
            col4 = prediction
            return pd.DataFrame({'X': col1, 'Y': col2, 'X': col3, 'SCORE': col4})

        self.df_scored = self.df.groupby('partition_id').apply(scoreData)

model представляет собой файл рассылки, а feat - список характеристик модели.

Пожалуйста, укажите, где я иду не так и как это исправить. Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...