Как сохранить повторную запись строки в новом фрейме данных или списке во время цикла, используя Spark SQL? - PullRequest
0 голосов
/ 21 октября 2019

У меня есть один фрейм данных. Этот фрейм данных дает мне список записей, а затем я собираюсь выполнить итерацию по каждой строке и выполнить некоторые манипуляции.

 for (row <- dataframe.rdd.collect()) {

// var anyval= row.mkString(",").split(",")(take the column);
}

Затем я делаю некоторые проверки, а затем, если текущая строка будет соответствовать требованию, попробуйтесоздать новый список или коллекцию для сохранения полной строки.

Не могли бы вы помочь с примером, как сохранить эту строку в новом фрейме данных с помощью spark sql?

1 Ответ

0 голосов
/ 01 ноября 2019

Существуют разные способы достижения этого, главное - понять основные компоненты поведения spark. Для всех из них (dataframe, dataset, rdd) вы не можете обновить фактическое значение, они являются неизменяемыми объектами, но вы можете перебирать их элементы и, основываясь на своей логике, создавать новый, основываясь насуществующийПримеры:

val yourDF = Seq( // Sample
  ("A1", 12, null),       // Record 1
  ("B1", -1, "Mexico"),   // Record 2
  ("C1", 2, "Argentina")  // Record 3
).toDF("id", "some_value", "country") // Column definition

yourDF.show() // Visualize your DF

Вышеприведенный код выведет:

+---+----------+---------+
| id|some_value|  country|
+---+----------+---------+
| A1|        12|     null|
| B1|        -1|   Mexico|
| C1|         2|Argentina|
+---+----------+---------+

Исходя из того, что это Dataframe, вы можете перебрать все строки и получить доступ к их элементам:

val newDF = yourDF
  .map(item =>{  // Iterate your DF 
    val id = item.getAs[String]("id") // Access their element (from row object - each item in your DF) - You need to specify datatype and 'column_name' on this approach
    val some_value = item.getAs[Integer]("some_value")
    val country = item.getAs[String]("country")
    val outputCountry = if(country != null) country.substring(0,3) else null
    // Output: id, first 3 chars of the country (if it is not null) and `some_value` multiplied by 10
    (id, outputCountry, some_value*10)
  })

newDF.show()

Вышеприведенный код выведет:

+---+----+---+
| _1|  _2| _3|
+---+----+---+
| A1|null|120|
| B1| Mex|-10|
| C1| Arg| 20|
+---+----+---+

Как видите, имена столбцов не совпадают с первыми DF, это потому, что мы создаем новый, и мы сделалиНе указывайте имена столбцов, мы можем использовать либо .toDF("column_a", "column_b", "column_c"), либо использовать класс case, как в следующем примере.

Давайте сделаем то же самое упражнение, но с использованием case classesScala).

case class Country(id: String, some_value: Integer, country: String) // Case class

val newDF = yourDF
  .as[Country] // Cast your DF with a case class to have a Dataset
  .map(country=>{ // iterate dataset
    val id = country.id // Access their element (as object notation, easier!)
    val some_value = country.some_value
    val countryName = country.country
    val outputCountry = if(countryName != null) countryName.substring(0,3) else null
    // Output: id, first 3 chars of the country (if it is not null) and `some_value` multiplied by 10
    Country(id, some_value*10, outputCountry) // Output will use a case class to define the schema of the new object (DAtaset[Country])
  })

newDF.show()

Над кодом будет выводиться:

+---+----------+-------+
| id|some_value|country|
+---+----------+-------+
| A1|       120|   null|
| B1|       -10|    Mex|
| C1|        20|    Arg|
+---+----------+-------+
...