Я делаю простой PySpark с консоли Jupyter и сталкиваюсь с проблемой, когда пытаюсь вызвать внешний код.Мой минимальный пример кода имеет одну зависимость, testpackage.zip
.Когда я вызываю функцию UserDefinedFunction, которая использует код в этом пакете, я получаю AttributeError
, показывающий, что Spark не может найти функцию в testpackage.zip.
testpackage
- это каталог, содержащий пустые __init__.py
и testmod.py
, который содержит
def testfunc(x):
return float(x)+1.33
Минимальный пример кода, я запускаю его в консоли Jupyter по одному блоку за раз:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import DoubleType
sess = SparkSession.builder.appName("testing").getOrCreate()
sc = sess.sparkContext
DEP_PATH = < path on driver >.testpackage.zip
sc.addPyFile(DEP_PATH)
import testpackage
df = sess.range(0, 12*1000*1000, numPartitions=12)
test_udf = udf(lambda x: testpackage.testmod.testfunc(x), DoubleType())
df = df.withColumn("udf_res", test_udf(df.id))
df.head(5) # error
Обратите внимание, что <путь к драйверу> (каталог, в котором находится зависимость) находится в моей PYTHONPATH.
Сообщение об ошибке читается частично:
«AttributeError: модуль« testpackage »не имеет атрибута« testmod »».
Я не уверен, связано ли это с тем, что я звоню testfunc
в формате udf, илиесли я просто не добавляю зависимость правильно.Каков наилучший способ добавления зависимости Python программно (без использования spark-submit)?