Добавить значения в фрейм данных против определенного идентификатора в Spark Scala - PullRequest
0 голосов
/ 03 мая 2020

Мой код:

var data = spark.read.option("header", "true")
      .option("inferSchema", "true")
      .csv("src/main/resources/student.csv")
        data.show()

Данные выглядят так:

ID  Name  City
1   Ali   swl
2   Sana  lhr
3   Ahad  khi
4   ABC   fsd

Теперь у меня есть список значений, таких как (1,2,1).

val nums: List[Int] = List(1, 2, 1)

И я хочу добавить эти значения к ID = 3. Так что DataFrame может выглядеть следующим образом.

ID  Name  City  newCol  newCol2  newCol3
1   Ali   swl    null     null    null
2   Sana  lhr    null     null    null
3   Ahad  khi     1        2        1
4   ABC   fsd    null     null    null

Интересно, возможно ли это? Любая помощь будет оценена. Спасибо

Ответы [ 2 ]

1 голос
/ 03 мая 2020

Да, это возможно.

Используйте when для заполнения совпадающих значений и otherwise для несоответствующих значений.

Я использовал zipWithIndex для создания уникальных имен столбцов.

Пожалуйста, проверьте ниже код.

scala> import org.apache.spark.sql.functions._

scala> val df = Seq((1,"Ali","swl"),(2,"Sana","lhr"),(3,"Ahad","khi"),(4,"ABC","fsd")).toDF("id","name","city") // Creating DataFrame with given sample data.
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> val nums = List(1,2,1) // List values.
nums: List[Int] = List(1, 2, 1)

scala> val filterData = List(3,4)

scala> spark.time{ nums.zipWithIndex.foldLeft(df)((df,c) => df.withColumn(s"newCol${c._2}",when($"id".isin(filterData:_*),c._1).otherwise(null))).show(false) } // Used zipWithIndex to make column names unique.
+---+----+----+-------+-------+-------+
|id |name|city|newCol0|newCol1|newCol2|
+---+----+----+-------+-------+-------+
|1  |Ali |swl |null   |null   |null   |
|2  |Sana|lhr |null   |null   |null   |
|3  |Ahad|khi |1      |2      |1      |
|4  |ABC |fsd |1      |2      |1      |
+---+----+----+-------+-------+-------+

Time taken: 43 ms

scala>

1 голос
/ 03 мая 2020

Сначала вы можете преобразовать его в DataFrame с одним столбцом массива, а затем «разложить» столбец массива на столбцы следующим образом:

import org.apache.spark.sql.functions.{col, lit}
import spark.implicits._

val numsDf =
  Seq(nums)
    .toDF("nums")
    .select(nums.indices.map(i => col("nums")(i).alias(s"newCol$i")): _*)

После этого вы можете использовать внешнее объединение для объединения data numsDf с условием ID == 3 следующим образом:

val resultDf = data.join(numsDf, data.col("ID") === lit(3), "outer") 

resultDf.show() выведет:

+---+----+----+-------+-------+-------+
| ID|Name|City|newCol0|newCol1|newCol2|
+---+----+----+-------+-------+-------+
|  1| Ali| swl|   null|   null|   null|
|  2|Sana| lhr|   null|   null|   null|
|  3|Ahad| khi|      1|      2|      3|
|  4| ABC| fsd|   null|   null|   null|
+---+----+----+-------+-------+-------+

Убедитесь, что вы добавили опцию spark.sql.crossJoin.crossJoin.enabled = true в сеанс spark:

val spark = SparkSession.builder()
  ...
  .config("spark.sql.crossJoin.enabled", value = true)
  .getOrCreate()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...