схема эволюции сложных типов - PullRequest
2 голосов
/ 04 апреля 2019

Каков статус эволюции схемы для arrays из structs (сложных типов) в искре?

Я знаю, что для ORC или Parquet для обычных простых типов работает довольно хорошо (добавление нового столбца), но я не смог найти никакой документации для моего желаемого случая.

Мой вариант использования должен иметь структуру, подобную этой:

user_id,date,[{event_time, foo, bar, baz, tag1, tag2, ... future_tag_n}, ...]

И я хочу иметь возможность добавлять новые поля в структуру массива.

Может ли комплексный тип Map (ключ-значение) привести к неэффективности? Там я бы по крайней мере был уверен, что добавление новых полей (тегов) будет гибким.

редактировать

case class BarFirst(baz:Int, foo:String)
case class BarSecond(baz:Int, foo:String, moreColumns:Int, oneMore:String)
case class BarSecondNullable(baz:Int, foo:String, moreColumns:Option[Int], oneMore:Option[String])
case class Foo(i:Int, date:String, events:Seq[BarFirst])
case class FooSecond(i:Int, date:String, events:Seq[BarSecond])
case class FooSecondNullable(i:Int, date:String, events:Seq[BarSecondNullable])
val dfInitial = Seq(Foo(1, "2019-01-01", Seq(BarFirst(1, "asdf")))).toDF
dfInitial.printSchema
dfInitial.show

root
 |-- i: integer (nullable = false)
 |-- date: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = false)
 |    |    |-- foo: string (nullable = true)


scala> dfInitial.show
+---+----------+----------+
|  i|      date|    events|
+---+----------+----------+
|  1|2019-01-01|[[1,asdf]]|
+---+----------+----------+

dfInitial.write.partitionBy("date").parquet("my_df.parquet")

tree my_df.parquet
my_df.parquet
├── _SUCCESS
└── date=2019-01-01
    └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet


val evolved = Seq(FooSecond(2, "2019-01-02", Seq(BarSecond(1, "asdf", 11, "oneMore")))).toDF
evolved.printSchema
evolved.show

scala> evolved.printSchema
root
 |-- i: integer (nullable = false)
 |-- date: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = false)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- moreColumns: integer (nullable = false)
 |    |    |-- oneMore: string (nullable = true)


scala> evolved.show
+---+----------+--------------------+
|  i|      date|              events|
+---+----------+--------------------+
|  1|2019-01-02|[[1,asdf,11,oneMo...|
+---+----------+--------------------+

import org.apache.spark.sql._
evolved.write.mode(SaveMode.Append).partitionBy("date").parquet("my_df.parquet")
my_df.parquet
├── _SUCCESS
├── date=2019-01-01
│   └── part-00000-fd77f730-6539-4b51-b680-b7dd5ffc04f4.c000.snappy.parquet
└── date=2019-01-02
    └── part-00000-64e65d05-3f33-430e-af66-f1f82c23c155.c000.snappy.parquet

val df = spark.read.parquet("my_df.parquet")
df.printSchema
scala> df.printSchema
root
 |-- i: integer (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |-- date: date (nullable = true)

дополнительные столбцы отсутствуют! Почему?

df.show
df.as[FooSecond].collect // AnalysisException: No such struct field moreColumns in baz, foo
df.as[FooSecondNullable].collect // AnalysisException: No such struct field moreColumns in baz, foo

Это поведение было оценено для свечей 2.2.3_2.11 и 2.4.2_2.12.

1 Ответ

1 голос
/ 26 апреля 2019

При выполнении кода после после редактирования (см. Выше) объединение схем отключено и новые столбцы не загружаются.При включении схемы слияния:

val df = spark.read.option("mergeSchema", "true").parquet("my_df.parquet")
scala> df.printSchema
root
 |-- i: integer (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- baz: integer (nullable = true)
 |    |    |-- foo: string (nullable = true)
 |    |    |-- moreColumns: integer (nullable = true)
 |    |    |-- oneMore: string (nullable = true)
 |-- date: date (nullable = true)

df.as [FooSecond] .collect // очевидно, что происходит сбой. NullPointerException должен использовать параметр df.as [FooSecondNullable] .collect // работает нормально

, теперь используяhive

evolved.write.mode(SaveMode.Append).partitionBy("date").saveAsTable("my_df")

, кажется, работает нормально (без исключения), но при попытке прочитать данные обратно в:

spark.sql("describe my_df").show(false)
+-----------------------+---------------------------------+-------+
|col_name               |data_type                        |comment|
+-----------------------+---------------------------------+-------+
|i                      |int                              |null   |
|events                 |array<struct<baz:int,foo:string>>|null   |
|date                   |string                           |null   |
|# Partition Information|                                 |       |
|# col_name             |data_type                        |comment|
|date                   |string                           |null   |
+-----------------------+---------------------------------+-------+

, когда вместо массива структур используются только базовые типы:

val first = Seq(Foo(1, "2019-01-01")).toDF
first.printSchema
first.write.partitionBy("dt").saveAsTable("df")
val evolved = Seq(FooEvolved(1,2, "2019-01-02")).toDF
evolved.printSchema
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
evolved.write.mode(SaveMode.Append).partitionBy("dt").saveAsTable("df")
org.apache.spark.sql.AnalysisException: The column number of the existing table default.df(struct<first:int,dt:string>) doesn't match the data schema(struct<first:int,second:int,dt:string>);

есть четкое сообщение об ошибке Вопрос: возможно ли еще развить схему в Hive?Или требуется ручная адаптация схемы?

Заключение

Поддерживается эволюция схемы для массивов структур, но при чтении файлов необходимо включить параметр слияния, и кажется, что он работает изполе только при прямом чтении файлов без Hive.

При чтении из улья возвращается только старая схема, как при записи новых столбцов, по-видимому, автоматически отбрасывается.

Эволюция схемыв формате партера (создание видов вручную, дополнительное преимущество, заключающееся в том, что паркет не поддерживает эволюцию схемы (возможно переименование, изменение типа данных)), выглядит интересной альтернативой, так как для параметра слияния-схемы, установленного в значение true, достаточно много ресурсов, и он работает длявсе движки SQL в Hadoop.

...