Я пишу пользовательскую библиотеку для своего приложения PySpark, и для некоторых файлов CSV необходимо выполнить небольшую предварительную обработку с использованием библиотеки Pandas.Предполагается, что предварительная обработка (ну, я так думаю) должна выполняться на узле драйвера, поскольку сам входной файл хранится в драйвере, а не в HDFS.Однако после того, как я добавил библиотеку как пакет с помощью функции addPyFile
, импортировал требуемые методы и выполнил функцию, она вызывает ImportError
.
Структура пакета выглядит следующим образом
module
|- __init__.py
|- module_1.py
|- module_2.py
|- sub_module_1
|- __init__.py
|- sub_mod_1.py
|- ...
Что я делаю в своем скрипте Python Runner:
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sparkContext.addPyFile("module.zip")
from module import module_1
module_1.func(spark, configs) # Exception raised here
В module_1.py
у меня есть
import pandas as pd
from sub_module_1 import sub_mod_1
def func(spark, configs):
input_local_file = configs.get("SOME_SECTION", "local_file")
input_hdfs_file = configs.get("SOME_SECTION", "hdfs_file")
output_hdfs_destination = configs.get("SOME_SECTION", "hdfs_dest")
# Reads input file
lf_pdf = pd.read_csv(input_local_file)
# Convert pandas dataframe to dictionary object
transformed_dict = to_dictionary(lf_pdf)
# Log printed
# Writes to hdfs, wraps a mapPartitions function
another_method(transformed_dict, input_hdfs_file, output_hdfs_destination)
Итак, значит ли это, что даже если я нена самом деле использовать Pandas в рабочих узлах, если пакет требует модуля и распространяется через опцию addPyFile
, это также потребует установки библиотеки Pandas на рабочих?Дело в том, что module_2
делает почти то же самое, за исключением того, что кадр данных Pandas вместо этого преобразуется в кадр данных Spark, но он не вызывает то же исключение.
Полное сообщение об ошибке:
WARN scheduler.TaskSetManager: Lost task 48.2 in stage 4.0 (TID 167, somewhere.org, executor 35): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/python/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/python/pyspark/worker.py", line 57, in read_command
command = serializer.loads(command.value)
File "/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/python/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "./module.zip/module/module_1.py", line 15, in <module>
ImportError: No module named pandas
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
РЕДАКТИРОВАТЬ: Я также регистрировал шаги в моем приложении, и точка, в которой возникает эта ошибка, после завершения предварительной обработки, вот почему яЯ не уверен, почему это происходит, поскольку Панды больше никогда не используются.