Обновление значения массива столбца зависит от значения другого массива столбца в том же кадре данных в Spark- Scala - PullRequest
0 голосов
/ 13 июля 2020

В кадре данных почти 1500+ столбцов, где существует различный набор массивов столбцов, например: col1, col2, col3, col4 ...., затем c1, c2, c3, c4 ......, затем столбец1, столбец2, столбец3, столбец4 ....... et c et c. Теперь на основе определенных logi c и условий необходимо обновить / изменить подмножество массива столбцов с таким же именем. Предположим, что logi c выглядит так:

col(i) = 2 *column(i) + 3* c(i)

и

col(i) = 2* column(i) + col(i)

, где i находится в диапазоне от верхнего до нижнего пределы подмножества массива. Здесь подмножество массива col1-col4 может быть col1, col2, col3.

Мне нужно выражение, в котором могут быть выражены две вышеупомянутые операции.

Я даю пример для выражения ниже :

col(i) = col(i) + 1

Пример:

scala> val original_df = Seq((1,2,3,4,9,8,7,6),(2,3,4,5,8,7,6,5),(3,4,5,6,7,6,5,4),(4,5,6,7,6,5,4,3),(5,6,7,8,5,4,3,2),(6,7,8,9,4,3,2,1)).toDF("col1","col2","col3","col4","c1","c2","c3","c4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 6 more fields]

scala> original_df.show()
+----+----+----+----+---+---+---+---+
|col1|col2|col3|col4| c1| c2| c3| c4|
+----+----+----+----+---+---+---+---+
|   1|   2|   3|   4|  9|  8|  7|  6|
|   2|   3|   4|   5|  8|  7|  6|  5|
|   3|   4|   5|   6|  7|  6|  5|  4|
|   4|   5|   6|   7|  6|  5|  4|  3|
|   5|   6|   7|   8|  5|  4|  3|  2|
|   6|   7|   8|   9|  4|  3|  2|  1|
+----+----+----+----+---+---+---+---+


scala> val requiredColumns = original_df.columns.zipWithIndex.filter(_._2 < 3).map(_._1).toSet
requiredColumns: scala.collection.immutable.Set[String] = Set(col1, col2, col3)


scala> val allColumns = original_df.columns
allColumns: Array[String] = Array(col1, col2, col3, col4, c1, c2, c3, c4)


scala> val columnExpr = allColumns.filterNot(requiredColumns(_)).map(col(_)) ++ requiredColumns.map(c => (col(c) + 1).alias(c))
columnExpr: Array[org.apache.spark.sql.Column] = Array(col4, c1, c2, c3, c4, (col1 + 1) AS `col1`, (col2 + 1) AS `col2`, (col3 + 1) AS `col3`)


scala> original_df.select(columnExpr:_*).show(false)
+----+---+---+---+---+----+----+----+
|col4|c1 |c2 |c3 |c4 |col1|col2|col3|
+----+---+---+---+---+----+----+----+
|4   |9  |8  |7  |6  |2   |3   |4   |
|5   |8  |7  |6  |5  |3   |4   |5   |
|6   |7  |6  |5  |4  |4   |5   |6   |
|7   |6  |5  |4  |3  |5   |6   |7   |
|8   |5  |4  |3  |2  |6   |7   |8   |
|9   |4  |3  |2  |1  |7   |8   |9   |
+----+---+---+---+---+----+----+----+

Поэтому мне нужно выражение в этом случае для следующих двух выражений:

col(i) = 2 *column(i) + 3* c(i)

и,

col(i) = 2* column(i) + col(i)

...