Я тестирую Spark 2.4.0 новые функции from_avro и to_avro.
Я создаю фрейм данных только с одним столбцом и тремя строками, сериализую его с помощью avro и десериализую обратно из avro.
Если входной набор данных создан как
val input1 = Seq("foo", "bar", "baz").toDF("key")
+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+
десериализация просто возвращает N копий последней строки:
+---+
|key|
+---+
|baz|
|baz|
|baz|
+---+
Если я создаю входной набор данных как
val input2 = input1.sqlContext.createDataFrame(input1.rdd, input1.schema)
, результаты верны:
+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+
Пример кода:
import org.apache.spark.sql.avro.{SchemaConverters, from_avro, to_avro}
import org.apache.spark.sql.DataFrame
val input1 = Seq("foo", "bar", "baz").toDF("key")
val input2 = input1.sqlContext.createDataFrame(input1.rdd, input1.schema)
def test_avro(df: DataFrame): Unit = {
println("input df:")
df.printSchema()
df.show()
val keySchema = SchemaConverters.toAvroType(df.schema).toString
println(s"avro schema: $keySchema")
val avroDf = df
.select(to_avro($"key") as "key")
println("avro serialized:")
avroDf.printSchema()
avroDf.show()
val output = avroDf
.select(from_avro($"key", keySchema) as "key")
.select("key.*")
println("avro deserialized:")
output.printSchema()
output.show()
}
println("############### testing .toDF()")
test_avro(input1)
println("############### testing .createDataFrame()")
test_avro(input2)
Результат:
############### testing .toDF()
input df:
root
|-- key: string (nullable = true)
+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+
avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":["string","null"]}]}
avro serialized:
root
|-- key: binary (nullable = true)
+----------------+
| key|
+----------------+
|[00 06 66 6F 6F]|
|[00 06 62 61 72]|
|[00 06 62 61 7A]|
+----------------+
avro deserialized:
root
|-- key: string (nullable = true)
+---+
|key|
+---+
|baz|
|baz|
|baz|
+---+
############### testing .createDataFrame()
input df:
root
|-- key: string (nullable = true)
+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+
avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":["string","null"]}]}
avro serialized:
root
|-- key: binary (nullable = true)
+----------------+
| key|
+----------------+
|[00 06 66 6F 6F]|
|[00 06 62 61 72]|
|[00 06 62 61 7A]|
+----------------+
avro deserialized:
root
|-- key: string (nullable = true)
+---+
|key|
+---+
|foo|
|bar|
|baz|
+---+
Из теста кажется, что проблема в фазе десериализации, так как при печати сериализованного df avro отображаются разные строки.
Я делаю это неправильно или есть ошибка?