Какова гарантия заказа при объединении двух столбцов искрового фрейма данных, которые обрабатываются отдельно? - PullRequest
1 голос
/ 09 февраля 2020

У меня есть фрейм данных с 3 столбцами

  1. дата
  2. jsonString1
  3. jsonString2

Я хочу расширить атрибуты внутри json в столбцы. поэтому я сделал что-то вроде этого.

 val json1 = spark.read.json(dataframe.select(col("jsonString1")).rdd.map(_.getString(0)))
 val json2 = spark.read.json(dataframe.select(col("jsonString2")).rdd.map(_.getString(0)))

 val json1Table = json1.selectExpr("id", "status")
 val json2Table = json2.selectExpr("name", "address")

Теперь я хочу собрать эти таблицы. поэтому я сделал следующее


     val json1TableWithIndex = addColumnIndex(json1Table)
     val json2TableWithIndex = addColumnIndex(json2Table)
     var finalResult = json1Table
            .join(json2Table, Seq("columnindex"))
            .drop("columnindex")

    def addColumnIndex(df: DataFrame) = spark.createDataFrame(
        df.rdd.zipWithIndex.map { case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex) },
        StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
    )

После выборки нескольких строк я заметил, что строки совпадают точно так же, как в исходном фрейме данных. Я не нашел никакой информации о гарантии заказа при объединении двух столбцов фрейма данных, которые обрабатываются отдельно , Это правильный способ решить мою проблему. Любая помощь приветствуется.

1 Ответ

0 голосов
/ 10 февраля 2020

Всегда рискованно полагаться на недокументированное поведение, поскольку ваш код может работать не так, как вы предполагали, потому что у вас есть только частичное понимание этого.

Вы можете сделать то же самое в гораздо более эффективном путь без использования разделения и объединения. Используйте функцию from_json, чтобы создать два вложенных столбца, а затем сгладить вложенные столбцы и, наконец, удалить промежуточные JSON строковые столбцы и вложенные столбцы.

Вот пример всего процесса.

import org.apache.spark.sql.types.{StringType, StructType, StructField}

val df = (Seq( 
("09-02-2020","{\"id\":\"01\", \"status\":\"Active\"}","{\"name\":\"Abdullah\", \"address\":\"Jumeirah\"}"), 
("10-02-2020","{\"id\":\"02\", \"status\":\"Dormant\"}","{\"name\":\"Ali\", \"address\":\"Jebel Ali\"}") 
).toDF("date","jsonString1","jsonString2"))

scala> df.show()
+----------+--------------------+--------------------+
|      date|         jsonString1|         jsonString2|
+----------+--------------------+--------------------+
|09-02-2020|{"id":"01", "stat...|{"name":"Abdullah...|
|10-02-2020|{"id":"02", "stat...|{"name":"Ali", "a...|
+----------+--------------------+--------------------+

val schema1 = (StructType(Seq(
  StructField("id", StringType, true), 
  StructField("status", StringType, true)
)))

val schema2 = (StructType(Seq(
  StructField("name", StringType, true), 
  StructField("address", StringType, true)
)))


val dfFlattened = (df.withColumn("jsonData1", from_json(col("jsonString1"), schema1))
            .withColumn("jsonData2", from_json(col("jsonString2"), schema2))
            .withColumn("id", col("jsonData1.id"))
            .withColumn("status", col("jsonData1.status"))
            .withColumn("name", col("jsonData2.name"))
            .withColumn("address", col("jsonData2.address"))
            .drop("jsonString1")
            .drop("jsonString2")
            .drop("jsonData1")
            .drop("jsonData2"))         

scala> dfFlattened.show()
+----------+---+-------+--------+---------+
|      date| id| status|    name|  address|
+----------+---+-------+--------+---------+
|09-02-2020| 01| Active|Abdullah| Jumeirah|
|10-02-2020| 02|Dormant|     Ali|Jebel Ali|
+----------+---+-------+--------+---------+   
...