Итерировать по кадру данных, когда каждый столбец передается для преобразования - PullRequest
0 голосов
/ 10 июля 2020

У меня есть фрейм данных со 100 столбцами и именами столбцов, такими как col1, col2, col3 .... Я хочу применить определенное преобразование к значениям столбцов на основе совпадений условий. Я могу хранить имена столбцов в массиве строк. И передайте значение каждого элемента массива в withColumn и на основе условия When i может преобразовать значения столбца по вертикали. Но вопрос в том, что Dataframe неизменяем, поэтому каждую обновленную версию необходимо хранить в новой переменной, а также новый dataframe необходимо передать withColumn для преобразования для следующей итерации. Есть ли способ создать массив фрейма данных, чтобы новый фрейм данных можно было сохранить как элемент массива, и он мог выполнять итерацию на основе значения итератора. Или есть другой способ обработать то же самое.

var arr_df : Array[DataFrame] = new Array[DataFrame](60)   

-> Это вызывает ошибку «не найден тип DataFrame»

val df(0) = df1.union(df2)

for(i <- 1 to 99){
  val df(i) = df(i-1).withColumn(col(i), when(col(i)> 0, col(i) + 
   1).otherwise(col(i)))

Здесь col (i) - это массив строки, в которых хранятся имена столбцов исходного фрейма данных.

Например:


scala> val original_df = Seq((1,2,3,4),(2,3,4,5),(3,4,5,6),(4,5,6,7),(5,6,7,8),(6,7,8,9)).toDF("col1","col2","col3","col4")
original_df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 2 more fields]

scala> original_df.show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   4|
|   2|   3|   4|   5|
|   3|   4|   5|   6|
|   4|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
+----+----+----+----+

Я хочу перебрать 3 столбца: col1, col2, col3, если значение этого столбца больше 3, то он будет обновлен на +1

Ответы [ 4 ]

0 голосов
/ 10 июля 2020

Вы можете использовать foldLeft всякий раз, когда хотите внести изменения в несколько columns, как показано ниже

val original_df = Seq(
  (1,2,3,4),
  (2,3,4,5),
  (3,4,5,6),
  (4,5,6,7),
  (5,6,7,8),
  (6,7,8,9)
).toDF("col1","col2","col3","col4")

//Filter the columns that yuou want to update
val columns = original_df.columns

columns.foldLeft(original_df){(acc, colName) =>
  acc.withColumn(colName, when(col(colName) > 3, col(colName) + 1).otherwise(col(colName)))
}
.show(false)

Вывод:

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |5   |
|2   |3   |5   |6   |
|3   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
|7   |8   |9   |10  |
+----+----+----+----+
0 голосов
/ 10 июля 2020

Проверьте код ниже.

scala> df.show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |4   |5   |
|3   |4   |5   |6   |
|4   |5   |6   |7   |
|5   |6   |7   |8   |
|6   |7   |8   |9   |
+----+----+----+----+
scala>  val requiredColumns = df.columns.zipWithIndex.filter(_._2 < 3).map(_._1).toSet
requiredColumns: scala.collection.immutable.Set[String] = Set(col1, col2, col3)
scala> val allColumns = df.columns
allColumns: Array[String] = Array(col1, col2, col3, col4)
scala> val columnExpr = allColumns.filterNot(requiredColumns(_)).map(col(_)) ++ requiredColumns.map(c => when(col(c) > 3, col(c) + 1).otherwise(col(c)).as(c))
scala> df.select(columnExpr:_*).show(false)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|1   |2   |3   |4   |
|2   |3   |5   |5   |
|3   |5   |6   |6   |
|5   |6   |7   |7   |
|6   |7   |8   |8   |
|7   |8   |9   |9   |
+----+----+----+----+
0 голосов
/ 10 июля 2020

Вы можете перебирать все столбцы и применять условие в одной строке, как показано ниже,

original_df.select(original_df.columns.map(c => (when(col(c) > lit(3), col(c)+1).otherwise(col(c))).alias(c)):_*).show()


+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   5|
|   2|   3|   5|   6|
|   3|   5|   6|   7|
|   5|   6|   7|   8|
|   6|   7|   8|   9|
|   7|   8|   9|  10|
+----+----+----+----+
0 голосов
/ 10 июля 2020

Если я вас правильно понимаю, вы пытаетесь выполнить операцию с кадром данных. для этого не нужно повторять. Я могу показать вам, как это можно сделать в pyspark. вероятно, его можно перенести в scala.

from pyspark.sql import functions as F
tst= sqlContext.createDataFrame([(1,7,0),(1,8,4),(1,0,10),(5,1,90),(7,6,0),(0,3,11)],schema=['col1','col2','col3'])
expr = [F.when(F.col(coln)>3,F.col(coln)+1).otherwise(F.col(coln)).alias(coln) for coln in tst.columns if 'col3' not in coln]
tst1= tst.select(*expr)

результаты:

tst1.show()
+----+----+
|col1|col2|
+----+----+
|   1|   8|
|   1|   9|
|   1|   0|
|   6|   1|
|   8|   7|
|   0|   3|
+----+----+

Это должно дать вам желаемый результат

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...