перезаписать набор данных в конкретном разделе не работает в версии 2.4 - PullRequest
0 голосов
/ 18 марта 2020

В моей работе последний шаг - сохранить выполненные данные в таблице Hive с разделением на столбце «дата». Иногда из-за сбоя задания мне нужно заново запускать задание только для определенного раздела. Как уже отмечалось, когда я использую приведенный ниже код, spark переопределяет все разделы при использовании режима перезаписи.

ds.write.partitionBy("date").mode("overwrite").saveAsTable("test.someTable")

Пройдя через несколько блогов и стекопоток, я выполнил следующие шаги, чтобы перезаписать только определенные разделы.

Step 1: Enbable dynamic partition for overwrite mode
spark.conf.set("spark.sql.sources.partitionOverWriteMode", "dynamic")

Step 2: write dataframe to hive table using saveToTable

Seq(("Company1", "A"), 
("Company2","B"))
.toDF("company", "id")
.write
.mode(SaveMode.Overwrite)
.partitionBy("id")
.saveAsTable(targetTable)

spark.sql(s"SELECT * FROM ${targetTable}").show(false)
spark.sql(s"show partitions ${targetTable}").show(false)

Seq(("CompanyA3", "A"))
.toDF("company", "id")
.write
.mode(SaveMode.Overwrite)
.insertInto(targetTable)

spark.sql(s"SELECT * FROM ${targetTable}").show(false)
spark.sql(s"show partitions ${targetTable}").show(false)

Тем не менее он перезаписывает все разделы. enter image description here enter image description here

Согласно этому блогу, https://www.waitingforcode.com/apache-spark-sql/apache-spark-sql-hive-insertinto-command/read , "insertinto" должен перезаписывать только определенные разделы

Если я сначала создаю таблицу, а затем использую метод "insertinto", она работает нормально

Установите требуемую конфигурацию, enter image description here Шаг 1: Создать таблицу

enter image description here

Шаг 2: Добавить данные, используя метод insertinto enter image description here

Шаг 3: Перезаписать разделение enter image description here

Я хотел бы знать, в чем разница между созданием таблицы кустов с помощью SaveToTable и созданием таблицы вручную? Почему это не работает в первом сценарии? Может ли кто-нибудь помочь мне в этом?

1 Ответ

0 голосов
/ 18 марта 2020

Попробуйте строчными буквами w!

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

not

spark.conf.set("spark.sql.sources.partitionOverWriteMode", "dynamic")

Это обмануло меня. У вас есть 2 варианта использования в вашем сценарии, если вы посмотрите.

Мой первоначальный ответ устарел, кажется.

...