Ошибка не сериализуемой задачи при вызове udf для запуска фрейма данных - PullRequest
0 голосов
/ 16 марта 2020

У меня есть функция scala для шифрования, затем я создал из нее udf и передал ее одному из столбцов в моем фрейме данных als_embeddings, чтобы добавить новый столбец в мой фрейм данных.

import java.util.Base64
import javax.crypto.Cipher
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}

val Algorithm = "AES/CBC/PKCS5Padding"
val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
val IvSpec = new IvParameterSpec(new Array[Byte](16))

def encrypt(text: String): String = {
  val cipher = Cipher.getInstance(Algorithm)
  cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)

  new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
}


val encryptUDF = udf((uid : String) => encrypt(uid))

проходя выше encryptUDF в мой искровой фрейм данных, чтобы создать новый столбец с зашифрованным uid

val als_encrypt_embeddings = als_embeddings.withColumn("encrypt_uid",encryptUDF(col("uid")))
als_encrypt_embeddings.show()

, но когда я делаю это, он выдает ошибку ниже:

Исключение в потоке "главная" организация. apache .spark.SparkException: задача не сериализуема

что мне здесь не хватает.

1 Ответ

1 голос
/ 17 марта 2020

Сообщение об ошибке Task not serializable правильное, но не очень четкое. Далее в трассировке стека приведено более подробное объяснение того, что пошло не так:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[...]
Caused by: java.io.NotSerializableException: javax.crypto.spec.IvParameterSpec
Serialization stack:
    - object not serializable (class: javax.crypto.spec.IvParameterSpec, value: javax.crypto.spec.IvParameterSpec@7d4d65f5)
    - field (class: Starter$$anonfun$1, name: IvSpec$1, type: class javax.crypto.spec.IvParameterSpec)
    - object (class Starter$$anonfun$1, <function1>)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 48 more

В части Caused by трассировки стека Spark сообщает, что не смог сериализовать экземпляр javax.crypto.spec.IvParameterSpec.

ParameterSpe c создан в JVM драйвера во время выполнения udf на одном из исполнителей. Поэтому объект должен быть сериализован, чтобы переместить его в виртуальную машину исполнителя. Поскольку объект не сериализуем, попытка его перемещения не удалась.

Самый простой способ решить эту проблему - создать объекты, необходимые для шифрования, непосредственно в виртуальной машине исполнителя путем перемещения блока кода в закрытие udf. :

val encryptUDF = udf((uid : String) => {
  val Algorithm = "AES/CBC/PKCS5Padding"
  val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
  val IvSpec = new IvParameterSpec(new Array[Byte](16))

  def encrypt(text: String): String = {
    val cipher = Cipher.getInstance(Algorithm)
    cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)

    new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
  }
  encrypt(uid)
})

Таким образом, все объекты будут создаваться напрямую в виртуальной машине-исполнителе.

Недостатком этого подхода является то, что для каждого вызова udf создается один набор объектов шифрования. , Это может вызвать проблемы с производительностью, если создание экземпляров этих объектов стоит дорого. Одним из вариантов будет использование mapPartitions вместо udf. В этом answer mapPartitions используется для того, чтобы избежать создания слишком большого количества дорогостоящих соединений с базой данных при итерации по фрейму данных. Этот подход также может быть использован здесь.

...