org. apache .spark.SparkException: задача не сериализуется. Scala Искра - PullRequest
1 голос
/ 30 апреля 2020

Перемещение существующего приложения из Spark 1.6 в Spark 2.2 * привело (в конечном итоге) к ошибке «org. apache .spark.SparkException: задача не сериализуема». Я упростил мой код, чтобы продемонстрировать ту же ошибку. Код запрашивает файл паркета, чтобы вернуть следующий тип данных: 'org. apache .spark. sql .Dataset [org. apache .spark. sql .Row]' Я применяю функцию для извлечения строки и целое число, возвращающее строку. Неотъемлемая проблема связана с тем, что Spark 2.2 возвращает набор данных, а не фрейм данных. (см. предыдущий пост о предварительных ошибках) Как написать кодировщик набора данных для поддержки сопоставления функции с организацией. apache .spark. sql .Dataset [String] в Scala Spark

var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String

scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable

Мое текущее решение этой проблемы - встраивание встроенного кода (см. Ниже), но оно не практично, так как мой производственный код имеет существенные параметры и функции, которые задействованы. Я также попытался преобразовать данные во фрейм данных (как это было в версии 1.6) и варианты определений функций, которые не оказались работоспособным решением.

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)

1 Ответ

0 голосов
/ 03 мая 2020

org.apache.spark.SparkException: Task not serialization

Чтобы решить эту проблему, поместите все свои функции и переменные в Object. Используйте эти функции и переменные везде, где это необходимо.

Таким способом вы можете исправить большинство проблем serialization

Example

package common
object AppFunctions {
  def append(s: String, start: Int) = s"${s}some thing"
}

object ExecuteQuery {
 import common.AppFunctions._

 [...]

 val d3 = d2.map(s => append(s,5)) // Pass required values to method.

 [...]


}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...