Spark Dataframes - выводит одну строку, содержащую ненулевые значения на ключ, из нескольких таких строк - PullRequest
3 голосов
/ 19 апреля 2019

Я новичок в spark-scala и мне нужна помощь сообщества.

Это журнал приложения, каждый запрос разбит на 5-6 строк, уникальный ключ во всех строках - reqID. Каждая строка имеет несколько столбцов для сбора и Мне нужно написать 1 запись на reqID в таблице.

val jsondftemp = spark.read.json('path') to read the json file    

Мой входной файл присоединения:

{"srchTrnsPhrs":"Psychiatric Care","Nm":"bh","Num":"746","reqPlsize":"11707","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncStrtTm":"2019-04-16 00:00:16.356614","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}
{"CoreFuncEndTm":"2019-04-16 00:00:16.536903","execTm":"180","reqID":"a520a039-310b-485e-9be2-3bfe51d376a2"}

Моя схема:

|-- CoreFuncEndTm: string (nullable = true)
|-- CoreFuncStrtTm: string (nullable = true)
|-- Nm: string (nullable = true)
|-- Num : string (nullable = true)
|-- execTm: string (nullable = true)
|-- reqID: string (nullable = true)
|-- srchTrnsPhrs: string (nullable = true)
|-- reqPlsize:  string (nullable = true)    

Фрейм данных имеет:

+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|                null|                null|     bh|       null|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|
|                null|2019-04-16 00:00:...|   null|       null|a520a039-310b-485...|                null|   null|   null|
|2019-04-16 00:00:...|                null|   null|        180|a520a039-310b-485...|                null|   null|   null|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+-------+ 

ожидаемый результат:

+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|       CoreFuncEndTm|      CoreFuncStrtTm|Nm     |execTm     |               reqID|       srchEntrdPhrs|Num    |reqPlsize|
+--------------------+--------------------+-------+-----------+--------------------+--------------------+-------+---------+
|2019-04-16 00:00:...|2019-04-16 00:00:...|     bh|        180|a520a039-310b-485...|    Psychiatric Care|   746 |   11707|

Любая помощь в этом действительно приветствуется.
ReqID - это ключ для объединения всех строк, который путают с Reducebykey и группируют по ключевым операциям.

1 Ответ

1 голос
/ 21 апреля 2019

Простой подход только от загруженного DF.

  1. Явное именование столбцов, но может быть динамическим с (_) и т. Д.
  2. Те же типы.
  3. Необходимо оценить, как обрабатываются ваши нулевые значения.
  4. Обрабатывает любой формат данных, который вы можете использовать в общем.

Вот некоторые вкусности с парой приемов, но не перегружающие новичка, как это было:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val colAggList = List("a", "b", "c", "d")
val dropCols = Seq("a", "b", "c", "d") 

val convToString = udf((arr: Seq[String]) => arr.mkString(",")) // Could just get the 1st element via data.withColumn("newcolname", $"colname"(0))

val df = sc.parallelize(Seq(
   ("r1", Some(1), Some(1), None, Some("x")),
   ("r1", None, None, Some(3), None),
   ("r2", Some(6), Some(4), None, Some("y")),
   ("r3", None, Some(1), Some(5), Some("abc")),
   ("r3", Some(4), None, None, None),
   ("r4", Some(1), None, None, None),
   ("r4", None, Some(2), None, None),
   ("r4", None, None, Some(3), None),
   ("r4", None, None, None, Some("xyz")),
   ("r5", Some(1), Some(2), Some(7), Some("A"))
   )).toDF("ID", "a", "b", "c", "d")
df.show(false)
df.printSchema()

// Note Nones, nulls are not collected.
val df2 = df.groupBy("ID").agg( collect_list(colAggList(0)).as("a"), collect_list(colAggList(1)).as("b"), collect_list(colAggList(2)).as("c"),     collect_list(colAggList(3)).as("d") ) 
df2.show(false)
df2.printSchema()

val df3 = df2.withColumn("aStr", convToString($"a")).withColumn("bStr", convToString($"b")).withColumn("cStr", convToString($"c")).withColumn("dStr", convToString($"d")).drop(dropCols:_*)
df3.show(false)
df3.printSchema()

возвращается, и поэтому вы можете видеть, как это работает - только показывается исходный и окончательный результат:

+---+----+----+----+----+
|ID |a   |b   |c   |d   |
+---+----+----+----+----+
|r1 |1   |1   |null|x   |
|r1 |null|null|3   |null|
|r2 |6   |4   |null|y   |
|r3 |null|1   |5   |abc |
|r3 |4   |null|null|null|
|r4 |1   |null|null|null|
|r4 |null|2   |null|null|
|r4 |null|null|3   |null|
|r4 |null|null|null|xyz |
|r5 |1   |2   |7   |A   |
+---+----+----+----+----+


+---+----+----+----+----+
|ID |aStr|bStr|cStr|dStr|
+---+----+----+----+----+
|r1 |1   |1   |3   |x   |
|r5 |1   |2   |7   |A   |
|r2 |6   |4   |    |y   |
|r4 |1   |2   |3   |xyz |
|r3 |4   |1   |5   |abc |
+---+----+----+----+----+

Обратите внимание на придуманное пропущенное значение, показанное как пустое.

...