Существуют разные способы достижения этого, главное - понять основные компоненты поведения spark
. Для всех из них (dataframe
, dataset
, rdd
) вы не можете обновить фактическое значение, они являются неизменяемыми объектами, но вы можете перебирать их элементы и, основываясь на своей логике, создавать новый, основываясь насуществующийПримеры:
val yourDF = Seq( // Sample
("A1", 12, null), // Record 1
("B1", -1, "Mexico"), // Record 2
("C1", 2, "Argentina") // Record 3
).toDF("id", "some_value", "country") // Column definition
yourDF.show() // Visualize your DF
Вышеприведенный код выведет:
+---+----------+---------+
| id|some_value| country|
+---+----------+---------+
| A1| 12| null|
| B1| -1| Mexico|
| C1| 2|Argentina|
+---+----------+---------+
Исходя из того, что это Dataframe
, вы можете перебрать все строки и получить доступ к их элементам:
val newDF = yourDF
.map(item =>{ // Iterate your DF
val id = item.getAs[String]("id") // Access their element (from row object - each item in your DF) - You need to specify datatype and 'column_name' on this approach
val some_value = item.getAs[Integer]("some_value")
val country = item.getAs[String]("country")
val outputCountry = if(country != null) country.substring(0,3) else null
// Output: id, first 3 chars of the country (if it is not null) and `some_value` multiplied by 10
(id, outputCountry, some_value*10)
})
newDF.show()
Вышеприведенный код выведет:
+---+----+---+
| _1| _2| _3|
+---+----+---+
| A1|null|120|
| B1| Mex|-10|
| C1| Arg| 20|
+---+----+---+
Как видите, имена столбцов не совпадают с первыми DF
, это потому, что мы создаем новый, и мы сделалиНе указывайте имена столбцов, мы можем использовать либо .toDF("column_a", "column_b", "column_c")
, либо использовать класс case, как в следующем примере.
Давайте сделаем то же самое упражнение, но с использованием case classes
(с Scala
).
case class Country(id: String, some_value: Integer, country: String) // Case class
val newDF = yourDF
.as[Country] // Cast your DF with a case class to have a Dataset
.map(country=>{ // iterate dataset
val id = country.id // Access their element (as object notation, easier!)
val some_value = country.some_value
val countryName = country.country
val outputCountry = if(countryName != null) countryName.substring(0,3) else null
// Output: id, first 3 chars of the country (if it is not null) and `some_value` multiplied by 10
Country(id, some_value*10, outputCountry) // Output will use a case class to define the schema of the new object (DAtaset[Country])
})
newDF.show()
Над кодом будет выводиться:
+---+----------+-------+
| id|some_value|country|
+---+----------+-------+
| A1| 120| null|
| B1| -10| Mex|
| C1| 20| Arg|
+---+----------+-------+