Spark: UDF исполняется много раз - PullRequest
6 голосов
/ 04 ноября 2019

У меня есть фрейм данных со следующим кодом:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Теперь, проверяя журналы, я обнаружил, что для каждой строки UDF выполняется 3 раза. Если я добавлю «test3» из столбца «test.three», UDF будет выполнен еще раз.

Может кто-нибудь объяснить, почему?

Можно ли этого избежать должным образом (без кэшированиядатафрейм после "test" добавляется, даже если это работает)?

1 Ответ

5 голосов
/ 04 ноября 2019

Если вы хотите избежать нескольких обращений к udf (что полезно, особенно если udf является узким местом в вашей работе), вы можете сделать это следующим образом:

val testUDF = udf(test _).asNondeterministic()

В основном вы говорите Spark, что вашфункция не является детерминированной, и теперь Spark гарантирует, что она вызывается только один раз, потому что небезопасно вызывать ее несколько раз (каждый вызов может возвращать разные результаты).

Также следует помнить, что этот прием не бесплатенвыполняя это, вы накладываете некоторые ограничения на оптимизатор, одним из побочных эффектов которого является, например, то, что оптимизатор Spark не проталкивает фильтры через выражения, которые не являются детерминированными, поэтому вы становитесь ответственными за оптимальное расположение фильтров в вашем запросе.

...