Spark - Как добавить элемент в массив структур - PullRequest
0 голосов
/ 14 января 2019

Имея эту схему:

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

Как мы можем добавить новое поле , как это?

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: integer (nullable = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

Я уже сделал это с простой структурой (более подробно в нижней части этого поста), но я не могу сделать это с массивом struct.

Это код для проверки:

val schema = new StructType()
    .add("Elems", ArrayType(new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    ))

val dataDS = Seq("""
{
  "Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()

val df = spark.read.schema(schema).json(dataDS.rdd)

df.show(false)
+---------------------------+
|Elems                      |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+

Как только мы получим DF, лучший подход, который у меня есть, - это создание структуры массивов для каждого элемента:

val mod_df = df.withColumn("modif_elems", 
     struct(
         array(lit("")).as("New_field"),
         col("Elems.Elem"),
         col("Elems.Desc")
                            ))

mod_df.show(false)
+---------------------------+-----------------------------+
|Elems                      |modif_elems                  |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+


mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: array (nullable = false)
 |    |    |-- element: string (containsNull = false)
 |    |-- Elem: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- Desc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

Мы не теряем никаких данных, но это не совсем то, что я хочу.

Обновление : Обходной путь в PD1.


Бонус-трек: изменение структуры (не в массиве)

Код почти такой же, но теперь у нас нет массива struct, поэтому проще изменить структуру:

val schema = new StructType()
    .add("Elems", new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    )


val dataDS = Seq("""
{
  "Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()    


val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems  |
+-------+
|[1, d1]|
+-------+

df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)

В этом случае для добавления поля нам нужно создать еще одну структуру:

val mod_df = df
    .withColumn("modif_elems", 
                struct(
                    lit("").alias("New_field"),
                    col("Elems.Elem"),
                    col("Elems.Desc")
                    )
               )

mod_df.show
+-------+-----------+
|  Elems|modif_elems|
+-------+-----------+
|[1, d1]|  [, 1, d1]|
+-------+-----------+

mod_df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: string (nullable = false)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)


PD1:

Хорошо, я использовал arrays_zip Функция Spark SQL (впервые в версии 2.4.0), и это почти то, что я хочу, но я не вижу, как мы можем изменить имена элементов ( как или псевдоним здесь не работает):

val mod_df = df.withColumn("modif_elems", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("Elem"),
            col("Elems.Desc").alias("Desc")
                    )
        )

mod_df.show(false)
+---------------------------+---------------------------------+
|Elems                      |modif_elems                      |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+

mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)

Struct modif_elems содержит 3 элемента с именами New_field , Элемент и Desc , а не 0 , 1 и 2 .

1 Ответ

0 голосов
/ 24 января 2019

Решение здесь. Нам нужно использовать arrays_zip и затем переименовать полученный столбец:

val mod_df = df
    .withColumn("modif_elems_NOT_renamed", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("ElemRenamed"),
            col("Elems.Desc").alias("DescRenamed")
                    ))
    .withColumn("modif_elems_renamed", 
               $"modif_elems_NOT_renamed".cast(ArrayType(elem_struct_recomposed)))


mod_df.show(false)
mod_df.printSchema

+---------------------------+---------------------------------+---------------------------------+
|Elems                      |modif_elems_NOT_renamed          |modif_elems_renamed              |
+---------------------------+---------------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+---------------------------------+

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems_NOT_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)
 |-- modif_elems_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: string (nullable = true)
 |    |    |-- ElemRenamed: integer (nullable = true)
 |    |    |-- DescRenamed: string (nullable = true)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...