Как остановить Spark для разрешения столбца UDF в условном выражении - PullRequest
0 голосов
/ 18 октября 2019

Я хочу выполнить некоторое условное ветвление, чтобы избежать вычисления ненужных узлов, но я замечаю, что если исходный столбец в выражении условия является UDF, то в противном случае разрешается независимо от:

 @pandas_udf("double", PandasUDFType.SCALAR)
 def udf_that_throws_exception(*cols):
   raise Exception('Error')

 @pandas_udf("int", PandasUDFType.SCALAR)
 def simple_mul_udf(*cols):
   result = cols[0]
   for c in cols[1:]:
     result *= c
   return result

 df = spark.range(0,5)

 df = df.withColumn('A', lit(1))
 df = df.withColumn('B', lit(2))

 df = df.withColumn('udf', simple_mul('A','B'))
 df = df.withColumn('sql', expr('A*B'))

 df = df.withColumn('res', when(df.sql < 100, lit(1)).otherwise(udf_that_throws(lit(0))))

Приведенный выше код работает должным образом, оператор в этом случае всегда верен, поэтому мой UDF, который выдает исключение, никогда не вызывается.

Однако, если я изменяю условие на использование df.udf вместо этого внезапно вызывается UDF в противном случае, и я получаю исключение, даже если результат условия не изменился.

Я думал, что смогу скрыть его, удалив UDF из условияоднако тот же результат происходит независимо от того:

  df = df.withColumn('cond', when(df.udf < 100, lit(1)).otherwise(lit(0)))
  df = df.withColumn('res', when(df.cond == lit(1), lit(1)).otherwise(udf_that_throws_exception(lit(0))))

Я полагаю, это как-то связано с тем, как Spark оптимизирует, что хорошо, но я ищу какой-либо способ сделать это, не неся затрат. Есть идеи?

Редактировать По запросу для получения дополнительной информации. Мы пишем обработчик, который может принять произвольную модель, и код генерирует график. По пути есть моменты, когда мы принимаем решения на основе состояния значений во время выполнения. Мы интенсивно используем панд UDF. Итак, представьте ситуацию, когда у нас есть несколько путей в графе и, в зависимости от некоторых условий во время выполнения, мы хотим следовать одному из этих путей, оставляя все остальные нетронутыми.

Я хотел бы закодировать эту логику вграфик, поэтому нет смысла собирать и переходить в коде.

Пример кода, который я предоставил, предназначен только для демонстрационных целей. Проблема, с которой я сталкиваюсь, заключается в том, что если столбец, используемый в операторе IF, является UDF или, по-видимому, если он является производным от UDF, то условие OTHERWISE всегда выполняется, даже если оно фактически никогда не использовалось. Если IF / ELSE являются дешевыми операциями, такими как литералы, я бы не возражал, но что если столбец UDF (возможно, с обеих сторон) приведет к большой агрегации или некоторому другому процессу длины, который фактически просто отбрасывается?

1 Ответ

0 голосов
/ 19 октября 2019

В PySpark UDF вычисляются заранее, и поэтому вы получаете это неоптимальное поведение. Это также видно из плана запроса:

== Physical Plan ==
*(2) Project [id#753L, 1 AS A#755, 2 AS B#758, pythonUDF1#776 AS udf#763, CASE WHEN (pythonUDF1#776 < 100) THEN 1.0 ELSE pythonUDF2#777 END AS res#769]
+- ArrowEvalPython [simple_mul_udf(1, 2), simple_mul_udf(1, 2), udf_that_throws_exception(0)], [id#753L, pythonUDF0#775, pythonUDF1#776, pythonUDF2#777]
   +- *(1) Range (0, 5, step=1, splits=8)

Оператор ArrowEvalPython отвечает за вычисление пользовательских функций, после чего условие будет оцениваться в операторе Project.

Причина, по которой вы вызываете другое поведение при вызове df.sql в вашем состоянии (оптимальное поведение), заключается в том, что это особый случай, в котором значение в этом столбце является постоянным (оба столбца Aи B являются постоянными), и оптимизатор Spark может оценить его заранее (в драйвере во время обработки плана запроса, до выполнения фактического задания в кластере), и, таким образом, он знает, что ветвь условия otherwise условия никогда не будетдолжны быть оценены. Если значение в этом столбце sql является динамическим (например, как в столбце id), поведение снова будет неоптимальным, поскольку Spark заранее не знает, что часть otherwise никогда не должна иметь место.

Если вы хотите избежать этого неоптимального поведения (вызывая udf в otherwise, даже если оно не требуется), одним из возможных решений является то, что вы оцениваете это условие внутри вашего udf, например, следующим образом:

@pandas_udf("int", PandasUDFType.SCALAR)
def udf_with_cond(*cols):
    result = cols[0]
    for c in cols[1:]:
        result *= c

    if((result < 100).any()):
        return result
    else:
        raise Exception('Error')

df = df.withColumn('res', udf_with_cond('A', 'B'))
...