Поврежденные строки, записанные в __HIVE_DEFAULT_PARTITION__ при попытке перезаписать раздел Hive - PullRequest
0 голосов
/ 02 октября 2018

Я вижу очень странное поведение при попытке перезаписи раздела в таблице Hive с помощью Spark 2.3

Во-первых, я устанавливаю следующие настройки при создании моей SparkSession:

.config("spark.sql.sources.partitionOverwriteMode", "dynamic")

Затем я копирую некоторые данные в новую таблицу и разбиваю их по столбцу date_id.

ds
  .write
  .format("parquet")
  .option("compression", "snappy")
  .option("auto.purge", "true")
  .mode(saveMode)
  .partitionBy("date_id")
  .saveAsTable("tbl_copy")

В HDFS я вижу, что созданы соответствующие каталоги date_id.

Затем я создаю DataSetсодержащий данные для раздела, который я хочу перезаписать, который содержит данные для одного date_id и вставить в Hive следующим образом:

  ds
    .write
    .mode(SaveMode.Overwrite)
    .insertInto("tbl_copy")

В качестве проверки работоспособности я записываю тот же набор данных в новую таблицу.

      ds
        .write
        .format("parquet")
        .option("compression", "snappy")
        .option("auto.purge", "true")
        .mode(SaveMode.Overwrite)
        .saveAsTable("tmp_tbl")

Данные в tmp_tbl точно соответствуют ожидаемым.

Однако, когда я смотрю на tbl_copy, я вижу новый каталог HDFS `date_id = HIVE_DEFAULT_PARTITION

Запрос tbl_cpy

SELECT * from tbl_copy WHERE date_id IS NULL

Я вижу строки, которые должны были быть вставлены в раздел date_id = 20180523, однако столбец date_id имеет значение null и не имеет отношения к row_cстолбец зависания был заполнен значением 20180523.

Похоже, что вставка в Hive каким-то образом вызывает искажение моих данных.Запись того же набора данных в новую таблицу не вызывает проблем.

Может ли кто-нибудь пролить свет на это?

Ответы [ 2 ]

0 голосов
/ 27 мая 2019

Да, это сложное поведение, объясните в документе:

https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/DataFrameWriter.html#insertInto(java.lang.String)

В отличие от saveAsTable, insertInto игнорирует имена столбцов и использует только позиционныеразрешение .Например:

    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
    scala> sql("select * from t1").show
    +---+---+
    |  i|  j|
    +---+---+
    |  5|  6|
    |  3|  4|
    |  1|  2|
    +---+---+
0 голосов
/ 03 октября 2018

Похоже, что столбцы разделов должны быть последними в наборе данных.

Я решил проблему, перенеся следующий метод на набор данных [T].

def partitionsTail(partitionColumns: Seq[String]) = {
  val columns = dataset.schema.collect{ case s if !partitionColumns.contains(s.name) => s.name} ++ partitionColumns

  dataset.select(columns.head, columns.tail: _*).as[T]
} 
...