У меня есть два файла.
У меня есть файл, в котором я создаю sparkcontext.
create_spark.py
Код такой
spark_conf = (SparkConf().setAppName(app_name)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.task.maxFailures", "14")
.set("spark.port.maxRetries", "50")
.set("spark.yarn.max.executor.failures", "14"))
spark_context = SparkContext(conf=spark_conf)
sqlContext=HiveContext(spark_context)
Тогда есть еще один файл, который содержит все коды.назовите его: function_file.py
он должен выполнять следующие функции: эта функция просто выполняет некоторую операцию с данными.
def adjust_name(line):
if line is not None:
if "(" in line:
if "\(" in line:
tem1 = line.split("\(")
return tem1[0]
else:
tem1 = line.split("(")
return tem1[0]
else:
return line
else:
return line
Теперь мы создаем udf функции adjust_name
как.
adjust=udf(adjust_name,StringType())
и мы используем этот udf в функциях process_sql как
и другую функцию, которая выполняет загрузку всей таблицы и все.для примера e
def process_sql(sqlContext,source_db,processing_db,table_name):
.
.
.df3 = df3.withColumn('org_name',trim(adjust(df3['col_name'])))
return table_name.
и теперь в файле create_spark.py я импортирую файл_функции как модуль.и я вызываю функцию process_sql как
x= function_file.process_sql(sqlContext,source_db,processing_db,table_name)
, все аргументы определены заранее.Но я получаю сообщение об ошибке:
ValueError: Невозможно запустить несколько SparkContexts одновременно;существующий SparkContext (), созданный udf по адресу function_file.py
ПРИМЕЧАНИЕ: я ограничен использованием только spark 1.6
РЕДАКТИРОВАТЬ: я получил подсказку, UDF создает sparkcontext еще доМой файл create_spark.py создается.
:Connecting to Spark and creating context with dim_emp_atsc_test_4_sept spark_context = SparkContext(conf=spark_conf)
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=process_handler.py, master=yarn-client) created by udf at ..