Spark Агрегирование нескольких столбцов (возможно для массива) из выходных данных соединения - PullRequest
0 голосов
/ 07 января 2019

У меня ниже наборов данных

Table1

Table1

Table2

enter image description here

Теперь я бы хотел получить набор данных ниже. Я попытался с левым внешним соединением Table1.id == Table2.departmentid, но я не получаю желаемый результат.

enter image description here

Позже мне нужно использовать эту таблицу, чтобы получить несколько отсчетов и преобразовать данные в xml. Я буду делать это преобразование, используя карту.

Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 07 января 2019

Только объединения недостаточно, чтобы получить желаемый результат. Возможно, вы что-то упустили, и последний элемент каждого вложенного массива может быть departmentid. Предполагая, что последний элемент вложенного массива равен departmentid, я сгенерировал вывод следующим образом:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions.collect_list

case class department(id: Integer, deptname: String)
case class employee(employeid:Integer, empname:String, departmentid:Integer)

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val department_df = Seq(department(1, "physics")
                            ,department(2, "computer") ).toDF()
val emplyoee_df = Seq(employee(1, "A", 1)
                      ,employee(2, "B", 1)
                      ,employee(3, "C", 2)
                      ,employee(4, "D", 2)).toDF()

val result = department_df.join(emplyoee_df, department_df("id") === emplyoee_df("departmentid"), "left").
      selectExpr("id", "deptname", "employeid", "empname").
      rdd.map {
        case Row(id:Integer, deptname:String, employeid:Integer, empname:String) => (id, deptname, Array(employeid.toString, empname, id.toString))
      }.toDF("id", "deptname", "arrayemp").
          groupBy("id", "deptname").
          agg(collect_list("arrayemp").as("emplist")).
        orderBy("id", "deptname")

Вывод выглядит так:

result.show(false)
+---+--------+----------------------+
|id |deptname|emplist               |
+---+--------+----------------------+
|1  |physics |[[2, B, 1], [1, A, 1]]|
|2  |computer|[[4, D, 2], [3, C, 2]]|
+---+--------+----------------------+

Объяснение: Если я разбью последнее преобразование кадра данных на несколько шагов, это, вероятно, прояснит, как генерируется вывод.

оставлено внешнее объединение между отделом_df и сотрудником_df

val df1 = department_df.join(emplyoee_df, department_df("id") === emplyoee_df("departmentid"), "left").
      selectExpr("id", "deptname", "employeid", "empname")
df1.show()
    +---+--------+---------+-------+
| id|deptname|employeid|empname|
+---+--------+---------+-------+
|  1| physics|        2|      B|
|  1| physics|        1|      A|
|  2|computer|        4|      D|
|  2|computer|        3|      C|
+---+--------+---------+-------+

создание массива с использованием значений некоторых столбцов из кадра данных df1

val df2 = df1.rdd.map {
                case Row(id:Integer, deptname:String, employeid:Integer, empname:String) => (id, deptname, Array(employeid.toString, empname, id.toString))
              }.toDF("id", "deptname", "arrayemp")
df2.show()
            +---+--------+---------+
        | id|deptname| arrayemp|
        +---+--------+---------+
        |  1| physics|[2, B, 1]|
        |  1| physics|[1, A, 1]|
        |  2|computer|[4, D, 2]|
        |  2|computer|[3, C, 2]|
        +---+--------+---------+

создать новый список, объединяющий несколько массивов, используя df2 dataframe

val result = df2.groupBy("id", "deptname").
              agg(collect_list("arrayemp").as("emplist")).
              orderBy("id", "deptname")
result.show(false)
            +---+--------+----------------------+
        |id |deptname|emplist               |
        +---+--------+----------------------+
        |1  |physics |[[2, B, 1], [1, A, 1]]|
        |2  |computer|[[4, D, 2], [3, C, 2]]|
        +---+--------+----------------------+
0 голосов
/ 07 января 2019
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val df = spark.sparkContext.parallelize(Seq(
   (1,"Physics"),
   (2,"Computer"),
   (3,"Maths")
 )).toDF("ID","Dept")

 val schema = List(
    StructField("EMPID", IntegerType, true),
    StructField("EMPNAME", StringType, true),
    StructField("DeptID", IntegerType, true)
  )

  val data = Seq(
    Row(1,"A",1),
    Row(2,"B",1),
    Row(3,"C",2),
    Row(4,"D",2) ,
    Row(5,"E",null)
  )

  val df_emp = spark.createDataFrame(
    spark.sparkContext.parallelize(data),
    StructType(schema)
  )

  val newdf =  df_emp.withColumn("CONC",array($"EMPID",$"EMPNAME",$"DeptID")).groupBy($"DeptID").agg(expr("collect_list(CONC) as emplist"))

  df.join(newdf,df.col("ID") === df_emp.col("DeptID")).select($"ID",$"Dept",$"emplist").show()

---+--------+--------------------+
| ID|    Dept|             listcol|
+---+--------+--------------------+
|  1| Physics|[[1, A, 1], [2, B...|
|  2|Computer|[[3, C, 2], [4, D...|
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...