Перемещение существующего приложения из Spark 1.6 в Spark 2.2 * привело (в конечном итоге) к ошибке «org. apache .spark.SparkException: задача не сериализуема». Я упростил мой код, чтобы продемонстрировать ту же ошибку. Код запрашивает файл паркета, чтобы вернуть следующий тип данных: 'org. apache .spark. sql .Dataset [org. apache .spark. sql .Row]' Я применяю функцию для извлечения строки и целое число, возвращающее строку. Неотъемлемая проблема связана с тем, что Spark 2.2 возвращает набор данных, а не фрейм данных. (см. предыдущий пост о предварительных ошибках) Как написать кодировщик набора данных для поддержки сопоставления функции с организацией. apache .spark. sql .Dataset [String] в Scala Spark
var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String
scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable
Мое текущее решение этой проблемы - встраивание встроенного кода (см. Ниже), но оно не практично, так как мой производственный код имеет существенные параметры и функции, которые задействованы. Я также попытался преобразовать данные во фрейм данных (как это было в версии 1.6) и варианты определений функций, которые не оказались работоспособным решением.
scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)