Обновление значения столбца в цикле в искре - PullRequest
6 голосов
/ 18 июня 2019

Краткий вопрос :

Для более прямого запроса я хочу последовательно выполнить все строки и назначить некоторые значения некоторым переменным (a, b, c), основываясь на определенных условиях для конкретной строки, тогда я бы назначил значение 1 из этих переменных в столбец этой конкретной строки.

Подробное

Я хочу обновить значение столбца во фрейме данных в spark. Обновление будет условным, при этом я буду запускать цикл в строке и обновлять столбец на основе значений других столбцов этой строки.

Я пытался использовать подход Column, но получил ошибку. Пожалуйста, предложите любой другой подход. Разрешение подхода withColumn также очень поможет.

Таблица

var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1.show()

Схема

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

Я попробовал 2 подхода здесь:

  1. withColumn
  2. i ("col_4") = adj_c

В приведенном ниже коде переменные, инициализированные в разных местах, должны быть размещены только таким образом, согласно условиям

Код

for(i <- table1.rdd.collect()) {
    if(i.getAs[Int]("col_1") > 0) {
       var adj_a = 0
       var adj_c = 0
        if(i.getAs[Int]("col_1") > (i.getAs[Int]("col_2") + i.getAs[Int]("col_3"))) {
            if(i.getAs[Int]("col_1") < i.getAs[Int]("col_2")) {
                adj_a = 10
                adj_c = 2
            }
            else {
                adj_a = 5
            }
        }
        else {
            adj_c = 1
        }
        adj_c = adj_c + i.getAs[Int]("col_2")
        table1.withColumn("col_4", adj_c)
         //i("col_4")  = adj_c
    }
}

Ошибка в первом случае :

table1.withColumn ("col_4", adj_c)

<console>:80: error: type mismatch;
 found   : Int
 required: org.apache.spark.sql.Column
               table1.withColumn("col_4", adj_c)
                                          ^

Я также попытался использовать col (adj_c) здесь, но он начал работать с

<console>:80: error: type mismatch;
 found   : Int
 required: String
               table1.withColumn("col_4", col(adj_c))
                                              ^

Ошибка во втором случае :

(i ("col_4") = adj_c)

<console>:81: error: value update is not a member of org.apache.spark.sql.Row
                i("col_4")  = adj_c
                ^

Я хочу, чтобы выходная таблица была:

+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    1|
|   42|   20|   10|    5|
+-----+-----+-----+-----+

Пожалуйста, предложите возможные решения и в случае сомнений вернитесь к вопросу.

Пожалуйста, помогите мне с этим, потому что я застрял в проблеме. Любое предложение будет очень полезным.

Ответы [ 3 ]

4 голосов
/ 24 июня 2019

UDF может использоваться с любой пользовательской логикой для значения столбца вычисления, например:

val calculateCol4 = (col_1:Int, col_2:Int, col_3:Int)  =>
  if (col_1 > 0) {

    var adj_a = 0
    var adj_c = 0
    if (col_1 > col_2 + col_3) {
      if (col_1 < col_2) {
        adj_a = 10
        adj_c = 2
      }
      else {
        adj_a = 5
      }
    }
    else {
      adj_c = 1
    }
    println("adj_c: "+adj_c)
    adj_c = adj_c + col_2
    // added for return correct result
    adj_c
  }
  // added for return correct result
  else 0

val col4UDF = udf(calculateCol4)
table1.withColumn("col_4",col4UDF($"col_1", $"col_2", $"col_3"))
4 голосов
/ 18 июня 2019

Вы должны использовать функцию when вместо такого сложного синтаксиса, также нет необходимости в явном цикле, Spark обрабатывает его сам. Когда вы выполняете withColumn, оно применяется к каждой строке

table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", 5).otherwise(1)).show

БЫСТРЫЙ ТЕСТ:

ВХОД

table1.show

-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

OUTPUT

table1.withColumn("col_4", when($"col_1" > $"col_2" + $"col_3", lit(5)).otherwise(1)).show
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    1|
|   42|   20|   10|    5|
+-----+-----+-----+-----+
3 голосов
/ 26 июня 2019

с использованием spark.sql, более легко читать и понимать -

scala> var table1 = Seq((11, 25, 2, 0), (42, 20, 10, 0)).toDF("col_1", "col_2", "col_3", "col_4")
table1: org.apache.spark.sql.DataFrame = [col_1: int, col_2: int ... 2 more fields]

scala> table1.show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|   11|   25|    2|    0|
|   42|   20|   10|    0|
+-----+-----+-----+-----+

scala> table1.createOrReplaceTempView("table1")


scala> val result = spark.sql(s""" select col_1,
     |                                    col_2,
     |                                    col_3,
     |                                    CASE WHEN col_1 > (col_2 + col_3)
     |                                           THEN 5
     |                                         ELSE   1
     |                                    END as col_4
     |                              from  table1 """)
result: org.apache.spark.sql.DataFrame = [col_1: int, col_2: int ... 2 more fields]


scala> result.show(false)
+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|
+-----+-----+-----+-----+
|11   |25   |2    |1    |
|42   |20   |10   |5    |
+-----+-----+-----+-----+

Надеюсь, это полезно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...