спарк 2.3.1 вставьте, чтобы удалить значение полей и изменить его на ноль - PullRequest
0 голосов
/ 03 января 2019

Я просто обновил свой искровой кластер с 2.2.1 до 2.3.1, чтобы использовать функцию перезаписи определенных разделов. см. Ссылку .

Но .... По какой-то причине, когда я тестирую его, я получаю очень странное поведение, см. Код:

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
case class MyRow(partitionField: Int, someId: Int, someText: String)
object ExampleForStack2 extends App{
  val sparkConf = new SparkConf()
  sparkConf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  sparkConf.setMaster(s"local[2]")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()
  val list1 = List(
    MyRow(1, 1, "someText")
      ,MyRow(2, 2, "someText2")
  )
  val list2 = List(
    MyRow(1, 1, "someText modified")
    ,MyRow(3, 3, "someText3")
  )
  val df = spark.createDataFrame(list1)
  val df2 = spark.createDataFrame(list2)

  df2.show(false)
  df.write.partitionBy("partitionField").option("path","/tmp/tables/").saveAsTable("my_table")
  df2.write.mode(SaveMode.Overwrite).insertInto("my_table")
  spark.sql("select * from my_table").show(false)
}

И вывод:

+--------------+------+-----------------+
|partitionField|someId|someText         |
+--------------+------+-----------------+
|1             |1     |someText modified|
|3             |3     |someText3        |
+--------------+------+-----------------+

+------+---------+--------------+
|someId|someText |partitionField|
+------+---------+--------------+
|2     |someText2|2             |
|1     |someText |1             |
|3     |3        |null          |
|1     |1        |null          |
+------+---------+--------------+

Почему я получаю эти нули?Кажется, что поля были перемещены?но почему?

Спасибо

1 Ответ

0 голосов
/ 03 января 2019

Хорошо, он найден, вставка в основывается на позиции полей. см. документацию

В отличие от saveAsTable, insertInto игнорирует имена столбцов и использует только разрешение на основе позиции. Например:

scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
scala> sql("select * from t1").show
+---+---+
|  i|  j|
+---+---+
|  5|  6|
|  3|  4|
|  1|  2|
+---+---+

Поскольку он вставляет данные в существующую таблицу, формат или параметры быть проигнорированным.

Более того, я использую динамический раздел, который должен отображаться как последнее поле. Таким образом, решение состоит в том, чтобы переместить динамические разделы в конец кадра данных, что в моем случае означает:

df2.select("someId", "someText","partitionField").write.mode(SaveMode.Overwrite).insertInto("my_table")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...