Как написать кодировщик набора данных для поддержки сопоставления функции с организацией. apache .spark. sql .Dataset [String] в Scala Spark - PullRequest
0 голосов
/ 26 марта 2020

Переход от Spark 1.6 к Spark 2.2 * привел к ошибке «error: Невозможно найти кодировщик для типа, хранящегося в наборе данных». Примитивные типы (Int, String и т. Д. c) »при попытке применить метод к набору данных, возвращенному из запросов к паркетной таблице. Я упростил мой код, чтобы продемонстрировать ту же ошибку. Код запрашивает файл паркета для возврата следующего типа данных: 'org. apache .spark. sql .Dataset [org. apache .spark. sql .Row]' Я применяю функцию для извлечения строки и целое число, возвращающее строку. Возвращает следующий тип данных: Array [String] Далее мне нужно выполнить обширные манипуляции, требующие отдельной функции. В этой тестовой функции я пытаюсь добавить строку, выдающую ту же ошибку, что и мой подробный пример. Я пробовал некоторые примеры кодировщика и использование «case», но не нашел подходящего решения. Буду признателен за любые предложения / примеры

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, 
cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x" 
(1)+","+s.getDecimal(1);

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d2.take(1)
20/03/25 19:01:08 WARN TaskSetManager: Stage 3 contains a task of very large size (131 KB). The 
maximum recommended task size is 100 KB.
res10: Array[String] = Array(ab04006000504304,1522194407.95162)

scala> def dd(s:String){
 | s + "some string"
 | }
dd: (s: String)Unit

scala> var d3 = d2.map{s=> dd(s) }
<console>:47: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, 
String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support 
for serializing other types will be added in future releases.

Для дальнейшей диагностики проблемы, я считаю, что этот сценарий (хотя я не пробовал все возможные решения) можно упростить до следующего кода:

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String){
 | s + "hi"
 | }
f: (s: String)Unit

scala> var test2 = test.map{ s => f(s) }
<console>:42: error: Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other types 
will be added in future releases.
   var test2 = test.map{ s => f(s) }

Ответы [ 2 ]

0 голосов
/ 28 марта 2020

Первое решение не работает с моим начальным (производственным) набором данных, скорее выдает ошибку "org. apache .spark.SparkException: Task not serializable" (интересно, хотя оба хранятся в одном типе данных (org. apache .spark. sql .Dataset [String] = [value: string]), который я считаю связанным. Я включил еще одно решение в свой набор тестовых данных, которое устраняет первоначальную ошибку энкодера и, как показано, фактически работает на моя проблема с игрушкой не связана с производственным набором данных. Я немного сбит с толку относительно того, почему именно мое приложение отодвинуто в сторону от перехода с версии 1.6 на 2.3, так как мне не приходилось делать какие-то специальные приспособления для моего приложения в течение многих лет и успешно запустил его для вычислений, которые, скорее всего, исчисляются триллионами. Другие исследования включали в себя упаковку моего метода в Serializable, исследования ключевого слова @transient, использование "org. apache .spark.serializer.KryoSerializer", написание моих методов. как функции и меняя все переменные на 'vals' понижение связанных сообщений в «стеке»).

scala>  import spark.implicits._
import spark.implicits._

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 |   val r = s + "hi"
 |   return r
 |   }
 f: (s: String)String

 scala> var d2 =  test.map{s => f(s)}(Encoders.STRING)
 d2: org.apache.spark.sql.Dataset[String] = [value: string]

 scala> d2.take(1)
 res0: Array[String] = Array(just some wordshi)

scala>

0 голосов
/ 28 марта 2020

У меня есть решение по крайней мере для моей упрощенной проблемы (ниже). Я буду тестировать больше ....

scala> var test = ( 1 to 3).map( _ => "just some words").toDS()
test: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def f(s: String): String = {
 | val r = s + "hi"
 | return r
 | }
f: (s: String)String

scala> var test2 = test.rdd.map{ s => f(s) }
test2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:43

scala> test2.take(1)
res9: Array[String] = Array(just some wordshi)
...