Как правильно использовать UDF внутри другого UDF? (PySpark SQL) - PullRequest
0 голосов
/ 24 апреля 2019

Я думал, что это должно быть относительно легко сделать, но я получаю сообщение об ошибке, когда я иду, чтобы зарегистрировать свой UDF и выполнить его.

Я пишу некоторую функцию Python модульным способом, поэтому он вызывает некоторые другие функции Python(например, у меня есть одна функция Python, которая принимает столбец и является относительно простой UDF, но внутри нее должен использоваться другой UDF, я вызываю check_existence (col), который сначала выполняет SQL-запрос к таблице, чтобы проверить, является ли текущее значение этого столбцадаже существует до запуска остальной логики).

Я пытался зарегистрировать обе функции как UDF, что имеет смысл, но

def check_exists(error):
  query_string = """select count(*) as count from my_data_frame where error_code = \"{0}\"""".format(error)
  result = spark.sql(query_string).toPandas()["count"][0]
  if result:
    return True 
  else: 
    return False 

#register as a udf 
from pyspark.sql.functions import udf 
from pyspark.sql.types import ArrayType, FloatType, BooleanType
check_exists_udf = udf("check_exists",BooleanType())

функция, которую я действительно хочу использовать против таблицы, которая вызывает check_exists:

def detect_col(error_code):
  if check_exists_udf(error_code):
    return 1 
  return 0 

registerкак udf с искрой udf

spark.udf.register("detect_col_udf", lambda error_code: detect_col(error_code), StringType())
%sql

select detect_col_udf(error_code, count), error_code, count, time
from time_series_view

Фактические результаты:

Ошибка в операторе SQL: SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 218.0 не выполнена 1 раз, чаще всегонедавний сбой: Потерянная задача 0.0 на этапе 218.0 (TID 4597, localhost, драйвер исполнителя): org.apache.spark.api.python.PythonException: Traceback (последний вызов был последним): Файл "/ databricks / spark / python / pyspark /worker.py ", строка 403, в основном процессе ()

File" /databricks/spark/python/pyspark/sql/udf.py ", строка 84, в init " {0} ". Format (type (func))) TypeError: Неверная функция: не является функцией или вызываемой ( вызов не определен):

...