Примените функцию к столбцу внутри структуры Spark DataFrame, заменив этот столбец - PullRequest
0 голосов
/ 30 апреля 2020

Я не могу найти именно то, что я ищу, поэтому вот мой вопрос. Я извлекаю из MongoDb некоторые данные в Spark Dataframe. Фрейм данных имеет следующую схему (df.printSchema):

|-- flight: struct (nullable = true)
|    |-- legs: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)
|    |-- segments: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)

Обратите внимание на структуру верхнего уровня, за которой следует массив, внутри которого мне нужно изменить свои данные. Например:

{
  "flight": {
    "legs": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ],
    "segments": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ]
  }
}

Я хочу экспортировать это в Json, но по какой-то деловой причине я хочу, чтобы даты прибытия имели другой формат, чем даты отъезда. Например, я могу экспортировать ISODate отправления в мс из эпохи, но не прибытия.

Для этого я подумал о применении пользовательской функции для преобразования:

  // Here I can do any tranformation. I hope to replace the timestamp with the needed value
  val doSomething: UserDefinedFunction = udf(  (value: Seq[Timestamp]) => {
    value.map(x => "doSomething" + x.getTime) }
  )

  val newDf = df.withColumn("flight.legs.departure",
    doSomething(df.col("flight.legs.departure")))

Но это просто возвращает совершенно новый столбец, содержащий массив из одной строки doSomething.

{
  "flight": {
    "legs": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z"
      }
    ],
    "segments": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
      }
    ]
  },
  "flight.legs.departure": ["doSomething1596268800000"]
}

И newDf.show(1)

+--------------------+---------------------+
|              flight|flight.legs.departure|
+--------------------+---------------------+
|[[[182], 94, [202...| [doSomething15962...|
+--------------------+---------------------+

Вместо

{
  ...
        "arrival": "2020-10-30T14:47:00Z",
        //leg departure date that I changed
        "departure": "doSomething1596268800000"
  ...   // segments not affected in this example
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
 ...
}

Есть идеи, как поступить?

Редактировать - уточнение : Имейте в виду, что моя схема намного сложнее, чем показанная выше. Например, есть еще один тег data верхнего уровня, поэтому flight ниже вместе с другой информацией. Затем внутри flight, legs и segments есть еще несколько элементов, некоторые из которых также являются вложенными. Я сосредоточился только на тех, что мне нужно было изменить.

Я говорю это, потому что я хотел бы самое простое решение, которое будет масштабироваться. Т.е. в идеале тот, который просто изменил бы требуемые элементы без необходимости деконструкции, и перестроил бы всю вложенную структуру. Если мы не можем избежать этого, является ли использование case-классов самым простым решением?

Ответы [ 2 ]

1 голос
/ 03 мая 2020

Пожалуйста, проверьте код ниже.

Время выполнения

With UDF: Время: 679 мс

Without UDF: Время: 1493 мс

Code With UDF

scala> :paste
// Entering paste mode (ctrl-D to finish)

  // Creating UDF to update value inside array.
  import java.text.SimpleDateFormat
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss") // For me departure values are in string, so using this to convert sql timestmap.
  val doSomething = udf((value: Seq[String]) => {
     value.map(x => s"dosomething${dateFormat.parse(x).getTime}")
  })

// Exiting paste mode, now interpreting.

import java.text.SimpleDateFormat
dateFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@41bd83a
doSomething: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated = df.select("flight.*").withColumn("legs",arrays_zip($"legs.arrival",doSomething($"legs.departure")).cast("array<struct<arrival:string,departure:string>>")).select(struct($"segments",$"legs").as("flight"))
updated.printSchema
updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+-------------------------------------------------------------------------------------------------+
|flight                                                                                           |
+-------------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, dosomething1604045100000]]]|
+-------------------------------------------------------------------------------------------------+

Time taken: 679 ms

scala>

Code Without UDF

scala> val df = spark.read.json(Seq("""{"flight": {"legs": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}],"segments": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}]}}""").toDS)
df: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<arrival:string,departure:string>>, segments: array<struct<arrival:string,departure:string>>>]

scala> df.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)


scala> df.show(false)
+--------------------------------------------------------------------------------------------+
|flight                                                                                      |
+--------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+--------------------------------------------------------------------------------------------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated= df
            .select("flight.*")
            .select($"segments",$"legs.arrival",$"legs.departure") // extracting legs struct column values.
            .withColumn("departure",explode($"departure")) // exploding departure column
            .withColumn("departure",concat_ws("-",lit("something"),$"departure".cast("timestamp").cast("long"))) // updating departure column values
            .groupBy($"segments",$"arrival") // grouping columns except legs column
            .agg(collect_list($"departure").as("departure")) // constructing list back
            .select($"segments",arrays_zip($"arrival",$"departure").as("legs")) // construction arrival & departure columns using arrays_zip method.
            .select(struct($"legs",$"segments").as("flight")) // finally creating flight by combining legs & segments columns.

  updated.printSchema
  updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+---------------------------------------------------------------------------------------------+
|flight                                                                                       |
+---------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, something-1604045100]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+---------------------------------------------------------------------------------------------+

Time taken: 1493 ms

scala>

0 голосов
/ 30 апреля 2020

Попробуйте это

scala> df.show(false)
+----------------------------------------------------------------------------------------------------------------+
|flight                                                                                                          |
+----------------------------------------------------------------------------------------------------------------+
|[[[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|
|[[[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|
+----------------------------------------------------------------------------------------------------------------+


scala> 

scala> df.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)


scala> 

scala> val myudf = udf(
     |   (arrs:Seq[String]) => {
     |     arrs.map("something" ++ _)
     |   } 
     | )
myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))

scala> val df2 = df.select($"flight", myudf($"flight.legs.arr") as "editedArrs")
df2: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<dep:string,arr:string>>, segments: array<struct<dep:string,arr:string>>>, editedArrs: array<string>]

scala> df2.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |-- editedArrs: array (nullable = true)
 |    |-- element: string (containsNull = true)


scala> df2.show(false)
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+
|flight                                                                                                          |editedArrs                         |
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+
|[[[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|[something2020-10-30T14:47:00.000Z]|
|[[[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|[something2020-10-25T14:37:00.000Z]|
+----------------------------------------------------------------------------------------------------------------+-----------------------------------+


scala> 

scala> 

scala> val df3 = df2.select(struct(arrays_zip($"flight.legs.dep", $"editedArrs") cast "array<struct<dep:string,arr:string>>" as "legs", $"flight.segments") as "flight")
df3: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<dep:string,arr:string>>, segments: array<struct<dep:string,arr:string>>>]

scala> 

scala> df3.printSchema
root
 |-- flight: struct (nullable = false)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- dep: string (nullable = true)
 |    |    |    |-- arr: string (nullable = true)


scala> 

scala> df3.show(false)
+-------------------------------------------------------------------------------------------------------------------------+
|flight                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------+
|[[[2020-10-30T13:35:00.000Z, something2020-10-30T14:47:00.000Z]], [[2020-10-30T13:35:00.000Z, 2020-10-30T14:47:00.000Z]]]|
|[[[2020-10-25T13:15:00.000Z, something2020-10-25T14:37:00.000Z]], [[2020-10-25T13:15:00.000Z, 2020-10-25T14:37:00.000Z]]]|
+-------------------------------------------------------------------------------------------------------------------------+


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...