Преобразовать набор данных с пустыми данными для дат - PullRequest
0 голосов
/ 15 мая 2018

У меня есть набор данных с датой, учетной записью и значением. Я хочу преобразовать набор данных в новый набор данных, в котором, если элемент accountid отсутствует на определенную дату, а затем добавить значение accountid со значением 0 против этой даты. Возможно ли это

    val df = sc.parallelize(Seq(("2018-01-01", 100.5,"id1"),
  ("2018-01-02", 120.6,"id1"),
  ("2018-01-03", 450.2,"id2")
  )).toDF("date", "val","accountid")
    +----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-02|120.6|      id1|
|2018-01-03|450.2|      id2|
+----------+-----+---------+

Я хочу преобразовать этот набор данных в этот формат

+----------+-----+---------+
|      date|  val|accountid|
+----------+-----+---------+
|2018-01-01|100.5|      id1|
|2018-01-01|  0.0|      id2|
|2018-01-02|120.6|      id1|
|2018-01-02|  0.0|      id2|
|2018-01-03|450.2|      id2|
|2018-01-03|0.0  |      id1|
+----------+-----+---------+

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Вы можете просто использовать функцию udf, чтобы выполнить ваше требование.

Но перед этим вам нужно будет получить полный набор из учитывает и получает транслируется для использования в функции udf.

Возвращенный массив из функции udf должен быть взорван и , наконец, выбрать столбцы .

import org.apache.spark.sql.functions._
val idList = df.select(collect_set("accountid")).first().getAs[Seq[String]](0)

val broadCastedIdList = sc.broadcast(idList)

def populateUdf = udf((date: String, value: Double, accountid: String)=> Array(accounts(date, value, accountid)) ++ broadCastedIdList.value.filterNot(_ == accountid).map(accounts(date, 0.0, _)))

df.select(populateUdf(col("date"), col("val"), col("accountid")).as("struct"))
    .withColumn("struct", explode(col("struct")))
    .select(col("struct.date"), col("struct.value").as("val"), col("struct.accountid"))
  .show(false)

И, конечно, вам понадобится case class

case class accounts(date:String, value:Double, accountid:String)

, который должен дать вам

+----------+-----+---------+
|date      |val  |accountid|
+----------+-----+---------+
|2018-01-01|100.5|id1      |
|2018-01-01|0.0  |id2      |
|2018-01-02|120.6|id1      |
|2018-01-02|0.0  |id2      |
|2018-01-03|450.2|id2      |
|2018-01-03|0.0  |id1      |
+----------+-----+---------+

Примечание: используется ключевое слово valueв случае класса, потому что зарезервированные имена идентификаторов не могут быть использованы в качестве имен переменных

0 голосов
/ 15 мая 2018

Вы можете создать ссылку

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val Row(minTs: Long, maxTs: Long) = df
  .select(to_date($"date").cast("timestamp").cast("bigint") as "date")
  .select(min($"date"), max($"date")).first

val by =  60 * 60 * 24

val ref = spark
  .range(minTs, maxTs + by, by)
  .select($"id".cast("timestamp").cast("date").cast("string").as("date"))
  .crossJoin(df.select("accountid").distinct)

и внешнее объединение с входными данными:

ref.join(df, Seq("date", "accountid"), "leftouter").na.fill(0.0).show
// +----------+---------+-----+      
// |      date|accountid|  val|
// +----------+---------+-----+
// |2018-01-03|      id1|  0.0|
// |2018-01-01|      id1|100.5|
// |2018-01-02|      id2|  0.0|
// |2018-01-02|      id1|120.6|
// |2018-01-03|      id2|450.2|
// |2018-01-01|      id2|  0.0|
// +----------+---------+-----+

Концепция принята от этого sparklyr ответа user6910411 .

...