Как извлечь строку из фрейма данных и применить преобразования - PullRequest
2 голосов
/ 05 мая 2020
def fun(row: Row): Seq[String] = {
     // some logic
}

+-------+----------------+-------+----------------+
|   name|       High on life     | waka  |  my love
+-------+----------------+-------+----------------+
|beatles|      0                 |  0    |  0
|  romeo|      0                 |  0    |  0
+-------+---------------++-------+----------------+

// and fun will return for row 1 
Seq("waka","High on life") 
// and  for row 2 
Seq("waka","my love")

+-------+----------------+-------+----------------+
|   name|       High on life     | waka  |  my love
+-------+----------------+-------+----------------+
|beatles|      1                 |  1    |  0
|  romeo|      0                 |  1    |  1
+-------+---------------++-------+----------------+

В принципе, мне нужно предложение. Как я могу увеличить значение или изменить значение столбца для этого конкретного столбца строки?

Я новичок в spark и scala, поэтому, пожалуйста, скажите мне, как я могу также выполнить итерацию строки?

1 Ответ

2 голосов
/ 05 мая 2020

Вы можете изменять значения столбцов на основе предварительных условий или произвольно, filter и drop некоторые строки на основе предварительных условий. Преобразования Dataframes или RDDs всегда возвращают новые Dataframe или RDD.

Например,

 val spark = SparkSession  // We create Spark object
    .builder()
    .appName("MyApp")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "MyApp")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext // we create SparkContext object

      val data = sc.textFile(input) // read a file and create a RDD
      val head = data.first() // header 

      val multiline = data
          .filter(line => line != head) // remove header 
          .map(line => line.split('|')) //RDD[String] => RDD[Array[String]]
          .map(arr =>{ // Based on preconditions we transform the RDD[Array[String]]
            val sInt: Int = makeInt(arr(0)) // to RDD[(String, String)]
            if(sInt < 0) (sInt.toString, arr(0))
            else (arr(0),arr(1))
          })
          .toDF("column1", "column2") // RDD to Dataframe

      multiline.show() // Show the content of the Dataframe
      /*
      map, filter, show are HOFs (Higher Order Functions) that iterate the RDD or 
     Dataframe
     There are a lot of HOFs to transform in many ways a RDD or Dataframe.
     The API documentation: http://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.rdd.RDD
      */
...