Почему Spark выводит двоичный файл вместо массива [байт] при создании DataFrame? - PullRequest
0 голосов
/ 31 октября 2018

В принципе, у меня есть DataFrame, который состоит из "Имя" и "Значения" полей. Первое поле - String, а второе - Array[Byte].

Что я хочу сделать с каждой записью этого DataFrame, так это применить любую функцию, используя UDF, и создать новый столбец. Это прекрасно работает, когда "Значения" является Array[Int]. Однако, когда это Array[Byte], появляется следующая ошибка:

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
  +- ExternalRDD [obj#43]

Полный код следующий:

scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]

scala> df1.show
+----+----------------+
|Name|          Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+

scala> val twice = udf { (values: Seq[Byte]) =>
   |     val result = Array.ofDim[Byte](values.length)
   |     for (i <- values.indices)
   |         result(i) = (2 * values(i).toInt).toByte
   |     result
   | }
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))

scala> val df2 = df1.withColumn("TwoTimes", twice('Values))

Я понимаю, что такая ошибка возникает из-за неправильного типа данных (ожидается: Array[Byte], однако он находит Binary), но я не понимаю, почему Spark вывел мой Array[Byte] как Binary. Может кто-нибудь объяснить мне, пожалуйста?

Если бы мне пришлось использовать Binary вместо Array[Byte], как мне справиться с этим в моем UDF?

Я уточняю, что мой оригинальный UDF не использует тривиальный цикл for. Я понимаю, что в этом примере это можно заменить методом map.

1 Ответ

0 голосов
/ 31 октября 2018

В Spark Array[Byte] представляется как BinaryType. Из документации видно:

открытый класс BinaryType расширяет DataType
Тип данных, представляющий Array[Byte] значений. Пожалуйста, используйте синглтон DataTypes.BinaryType.

Следовательно, Array[Byte] и Binary одинаковы, однако есть некоторые различия между ними и Seq[Byte], что приводит к ошибке.

Чтобы устранить проблему, просто замените Seq[Byte] на Array[Byte] в формате udf:

val twice = udf { (values: Array[Byte]) =>
  val result = Array.ofDim[Byte](values.length)
  for (i <- values.indices)
    result(i) = (2 * values(i).toInt).toByte
  result
}
...