Из-за проблемы импорта pySpark на AWS застрял в состоянии «Выполнение» - PullRequest
0 голосов
/ 02 января 2019

Вкратце: Я запускаю приложение pySpark на EMR AWS.Когда я сопоставляю rdd с помощью пользовательской функции, которая находится во внешнем модуле во внешнем пакете (поставляется внутри .zip-файла в виде --py-files), кластер застревает - состояние «Выполнение» сохраняется, пока не появятся больше строк журнала, покаЯ вручную завершаю его.

Чем это не является: Это не является правильным исключением импорта - так как это привело бы к завершению приложения при выполнении строк импорта, вызывая соответствующее исключение, которое делаетне произойдет.Также, как видно ниже, вызов функции, которая сопоставляется с лямбда-функцией, аналогичной той, когда вызываемая функция находится в «проблемном» модуле, работает.

Что это такое: Ошибка возникает только тогда, когда программа пытается использовать функцию из этого модуля в качестве функции отображения в преобразовании, записанном в основной программе.Кроме того, если я удаляю строку импорта, выделенную во внешнем файле («проблемный» модуль) - импорт, который никогда не используется там в этом минимальном контексте воспроизведения ошибок (но в реальном контексте он используется) - ошибка перестаетСуществовать.

Ниже приведен код для минимального примера ошибки, включая комментирование 2 важных строк и некоторую техническую информацию.Буду признателен за любую помощь.

Вот основная программа:

import spark_context_holder
from reproducing_bugs_external_package import reproducing_bugs_external_file


sc = spark_context_holder.sc
log = spark_context_holder.log


def make_nums_rdd():
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)

log.warn("Starting my code!")
sum = sc.parallelize([1,2,3]*300).map(lambda x: x*x/1.45).sum()
log.warn("The calculated sum using in-line expression, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(sum))
simple_sum_rdd = make_nums_rdd()
log.warn("The calculated sum using the in-file function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = reproducing_bugs_external_file.make_nums_rdd(sc)
log.warn("The calculated sum using the external file's function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = sc.parallelize([1,2,3]*300).map(reproducing_bugs_external_file.calc_func)
log.warn("The calculated sum using the external file's mapping function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
# This last line does not get logged, while the others up until this one do. Here the cluster gets stuck on Running status without outputting any more log lines

В zip-файле, поставляемом в виде --py-files, у меня следующая структура:

> spark_context_holde.py
> reproducing_bugs_external_package
  >> __init__.py
  >> reproducing_bugs_external_file.py

И вот их соответствующее содержимое:

spark_context_holder.py

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

# sc.setLogLevel("ALL")

def getParallelismAlternative():
    return int(sc.getConf().get('spark.cores.max'))

__ init __. Py

from . import reproducing_bugs_external_file

__all__ = [reproducing_bugs_external_file]

reproduction_bugs_external_file.py

import numpy
import spark_context_holder  # If this is removed - the bug stops!


def make_nums_rdd(sc):
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)


def calc_func(x):
    return x*x/1.45

Дополнительные технические данные:

  • Метка выпуска: emr-5.17.0
  • Распределение Hadoop: Amazon 2.8.4
  • Приложения: Spark 2.3.1
  • с использованием python3.4, который является версией 3, установленной на компьютерах AWS на сегодняшний день

1 Ответ

0 голосов
/ 16 января 2019

Я думаю, что ваша фундаментальная проблема в том, что вы берете кучу установочного кода Pyspark, который предназначен для запуска только на главном узле, и вместо этого запускаете его на подчиненных узлах. Там нет причин, почему эти строки:

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

должен быть во внешнем модуле, и он определенно не должен быть в модуле, который вы архивируете и экспортируете на подчиненные узлы через --py-files. Это, безусловно, вызовет много неопределенного поведения, возможно, вплоть до зависшей ошибки, которую вы получаете.

Переместите вышеуказанные строки в вашу основную программу, и все должно быть в порядке. Вам также придется переписать любой оставшийся код в spark_context_holder.py для соответствия. Например, getParallelismAlternative будет принимать sc в качестве аргумента:

def getParallelismAlternative(sc):
    return int(sc.getConf().get('spark.cores.max'))
...