Почему мой вызов udf не видит зависимости, добавленной с помощью SparkContext.addPyFile? - PullRequest
0 голосов
/ 25 января 2019

Я делаю простой PySpark с консоли Jupyter и сталкиваюсь с проблемой, когда пытаюсь вызвать внешний код.Мой минимальный пример кода имеет одну зависимость, testpackage.zip.Когда я вызываю функцию UserDefinedFunction, которая использует код в этом пакете, я получаю AttributeError, показывающий, что Spark не может найти функцию в testpackage.zip.

testpackage - это каталог, содержащий пустые __init__.py и testmod.py, который содержит

def testfunc(x):
    return float(x)+1.33

Минимальный пример кода, я запускаю его в консоли Jupyter по одному блоку за раз:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import DoubleType

sess = SparkSession.builder.appName("testing").getOrCreate()
sc = sess.sparkContext

DEP_PATH = < path on driver >.testpackage.zip
sc.addPyFile(DEP_PATH)
import testpackage

df = sess.range(0, 12*1000*1000, numPartitions=12)

test_udf = udf(lambda x: testpackage.testmod.testfunc(x), DoubleType())

df = df.withColumn("udf_res", test_udf(df.id))

df.head(5) # error

Обратите внимание, что <путь к драйверу> (каталог, в котором находится зависимость) находится в моей PYTHONPATH.

Сообщение об ошибке читается частично:

«AttributeError: модуль« testpackage »не имеет атрибута« testmod »».

Я не уверен, связано ли это с тем, что я звоню testfunc в формате udf, илиесли я просто не добавляю зависимость правильно.Каков наилучший способ добавления зависимости Python программно (без использования spark-submit)?

1 Ответ

0 голосов
/ 26 января 2019

Вам необходимо импортировать модуль следующим образом. Можете ли вы попробовать ниже -

из testpackage.testmod import *

test_udf = udf(lambda x: testfunc(x), DoubleType())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...