Вкратце: Я запускаю приложение pySpark на EMR AWS.Когда я сопоставляю rdd с помощью пользовательской функции, которая находится во внешнем модуле во внешнем пакете (поставляется внутри .zip-файла в виде --py-files), кластер застревает - состояние «Выполнение» сохраняется, пока не появятся больше строк журнала, покаЯ вручную завершаю его.
Чем это не является: Это не является правильным исключением импорта - так как это привело бы к завершению приложения при выполнении строк импорта, вызывая соответствующее исключение, которое делаетне произойдет.Также, как видно ниже, вызов функции, которая сопоставляется с лямбда-функцией, аналогичной той, когда вызываемая функция находится в «проблемном» модуле, работает.
Что это такое: Ошибка возникает только тогда, когда программа пытается использовать функцию из этого модуля в качестве функции отображения в преобразовании, записанном в основной программе.Кроме того, если я удаляю строку импорта, выделенную во внешнем файле («проблемный» модуль) - импорт, который никогда не используется там в этом минимальном контексте воспроизведения ошибок (но в реальном контексте он используется) - ошибка перестаетСуществовать.
Ниже приведен код для минимального примера ошибки, включая комментирование 2 важных строк и некоторую техническую информацию.Буду признателен за любую помощь.
Вот основная программа:
import spark_context_holder
from reproducing_bugs_external_package import reproducing_bugs_external_file
sc = spark_context_holder.sc
log = spark_context_holder.log
def make_nums_rdd():
return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)
log.warn("Starting my code!")
sum = sc.parallelize([1,2,3]*300).map(lambda x: x*x/1.45).sum()
log.warn("The calculated sum using in-line expression, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(sum))
simple_sum_rdd = make_nums_rdd()
log.warn("The calculated sum using the in-file function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = reproducing_bugs_external_file.make_nums_rdd(sc)
log.warn("The calculated sum using the external file's function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = sc.parallelize([1,2,3]*300).map(reproducing_bugs_external_file.calc_func)
log.warn("The calculated sum using the external file's mapping function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
# This last line does not get logged, while the others up until this one do. Here the cluster gets stuck on Running status without outputting any more log lines
В zip-файле, поставляемом в виде --py-files, у меня следующая структура:
> spark_context_holde.py
> reproducing_bugs_external_package
>> __init__.py
>> reproducing_bugs_external_file.py
И вот их соответствующее содержимое:
spark_context_holder.py
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")
# sc.setLogLevel("ALL")
def getParallelismAlternative():
return int(sc.getConf().get('spark.cores.max'))
__ init __. Py
from . import reproducing_bugs_external_file
__all__ = [reproducing_bugs_external_file]
reproduction_bugs_external_file.py
import numpy
import spark_context_holder # If this is removed - the bug stops!
def make_nums_rdd(sc):
return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)
def calc_func(x):
return x*x/1.45
Дополнительные технические данные:
- Метка выпуска: emr-5.17.0
- Распределение Hadoop: Amazon 2.8.4
- Приложения: Spark 2.3.1
- с использованием python3.4, который является версией 3, установленной на компьютерах AWS на сегодняшний день