Я работаю над проектом python spark, где изначально я написал скрипт для загрузки фрейма данных в postgres для конкретного клиента, который включает в себя служебную функцию, которая загружает данные в postgres.
df.rdd.repartition(self.__max_conn).foreachPartition(
lambda iterator: load_utils.load_tab_postgres(conn_prop=conn_prop,
tab_name=<tablename>,
iterator=iterator))
ПервоначальноВесь код был в одном модуле, включая приведенный выше фрагмент кода и load_utils (), который работал отлично.
Позже мне пришлось извлечь общий код, включая load_utils, в базовый модуль, который можно использовать в разных клиентских модулях.Это когда тот же код не с ошибкой ниже:
Файл "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py ", строка 764, в файле foreachPartition" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib / pyspark.zip / pyspark / rdd.py ", строка 1004, в файле count" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py ", строка 995, в общем файле" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py ", строка 869, в сложенном виде Файл" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/pyspark.zip / pyspark / rdd.py ", строка 771, в файле сбора" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py ", строка 813, в вызов Файл" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py ", строка 45, в файле deco" / opt / cloudera / parcels / CDH-5.13.1-1.cdh5.13.1.p0.2 / lib / spark / python / lib / py4j-0.9-src.zip / py4j / protocol.py ", строка 308, в get_return_value py4j.protocol.Py4JJavaError: Произошла ошибка при вызове z: org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 18 на этапе 126.0 4 раза, самый последний сбой: сбой задачи 18.3 на этапе 126.0 (TID 24028, tbsatad6r15g24.company.co.us, исполнитель 242): org.apache.spark.api.python.PythonException: обратная связь (последний вызов был последним): файл "/opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python / lib / pyspark.zip / pyspark / worker.py ", строка 98, в основной команде = pickleSer._read_with_length (infile) Файл" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2 / lib / spark / python / lib / pyspark.zip / pyspark / serializers.py ", строка 164, в _read_with_length возвращает файл self.loads (obj)" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2 / lib / spark / python / lib / pyspark.zip / pyspark / serializers.py ", строка 422, при загрузке возвращает файл pickle.loads (obj)" build / bdist.linux-x86_64 /egg / basemodule / init .py ", строка 12, в файле импорта basemodule.entitymodule.base" build / bdist.linux-x86_64 / egg / basemodule / entitymodule / base.py ", строка 12, вФайл "Буй"ld / bdist.linux-x86_64 / egg / basemodule / contexts.py ", строка 17, в файле" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/lib/spark/python / lib / pyspark.zip / pyspark / conf.py ", строка 104, в init SparkContext._ensure_initialized () файле" /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2 / lib / spark / python / lib / pyspark.zip / pyspark / context.py ", строка 245, в _ensure_initialized SparkContext._gateway = gateway или файле launch_gateway ()" / opt / cloudera / parcels / CDH-5.13.1-1.cdh5.13.1.p0.2 / lib / spark / python / lib / pyspark.zip / pyspark / java_gateway.py ", строка 48, в launch_gateway SPARK_HOME = os.environ [" Файл SPARK_HOME "]"/dhcommon/dhpython/python/lib/python2.7/UserDict.py ", строка 23, в getitem повысить KeyError (ключ) KeyError: 'SPARK_HOME'
Ниже приведенокоманда spark_submit для запуска кода:
spark-submit --master yarn - кластер режима развертывания --driver-class-path postgresql-42.2.4.jre6.jar --jars spark-csv_2.10-1.4.0.jar, Обще-CSV-1.4.jar, PostgreSQL-42.2.4.jre6.jar --py-files project.egg driver_file.py
В обоих вышеописанных сценариях файл load_utils, содержащий метод load_tab_postgres, будет включен в project.egg.