Как использовать функцию преобразования высшего порядка? - PullRequest
0 голосов
/ 13 декабря 2018

Это примерно transform функция высшего порядка (https://issues.apache.org/jira/browse/SPARK-23908).

Есть ли способ использовать ее как стандартную функцию (в пакете org.apache.spark.sql.functions._)?

У меня естьмассив строк, и я хочу применить нормализацию URI к каждой из них. На данный момент я сделал это с UDF. Я просто прыгнул, что с искрой 2.4.0, я мог бы пропустить UDF.

Как явидите, его следует использовать в selectExpr, как df.selectExpr("transform(i, x -> x + 1)"), но предназначен ли он только для selectExpr?

Если использовать его таким образом, есть ли в любом случае пользовательская функция для преобразования?Есть ли способ добиться этого или я должен прибегнуть к использованию старых добрых UDF?

1 Ответ

0 голосов
/ 13 декабря 2018

Есть ли в любом случае использовать его как стандартную функцию, расположенную в пакете org.apache.spark.sql.functions._?

На данный момент она предназначена только для использования с SQLвыражений, хотя если вы хотите вернуть Column ваше использование expr:

org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column

Используя его таким образом, есть ли в любом случае предоставить пользовательскую функцию для преобразования?

Можно использовать Scala UDF *:

spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show
+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

, хотя это не дает никаких реальных преимуществ по сравнению с UDF, принимающим Seq.


* Частичная поддержка UDF в Python, кажется, уже на месте (udfs распознаются, типы правильно выводятся, а вызовы отправляются), но с 2.4.0 механизм сериализации, похоже, работаетне работает (все записи передаются в UDF как None):

from typing import Optional
from pyspark.sql.functions import expr

sc.version
'2.4.0'
def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)
df.show()
+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+

Конечно, здесь нет никакого реального повышения производительности -отправляется на BasePythonRunner, поэтому накладные расходы должны быть такими же, как на обычном udf.

Связанный билет JIRA SPARK-27052 - Использование PySpark udf в преобразовании дает значения NULL

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...