Как создать или преобразовать в набор данных в набор данных [строка] - PullRequest
0 голосов
/ 04 февраля 2020

Здравствуйте, я пытаюсь протестировать следующую функцию, которая принимает набор данных [Row] в качестве параметра

 def getReducedSubsidiaries(dsSubsidiaries: Dataset[Row])(implicit spark: SparkSession): Dataset[SubsidiariesImpacted] = {
    import spark.implicits._
    dsSubsidiaries.as[SubsidiariesImpactedStage]
      .groupByKey(_.subsidiary_uuid)
        .reduceGroups((a, b) => if (a.event_timestamp.compareTo(b.event_timestamp) >= 0) a else b)
        .map(_._2)
        .select(
          $"subsidiary_uuid",
          $"subsidiary_id",
          $"company_uuid"
        )
        .as[SubsidiariesImpacted]
  }

Я пытаюсь создать DataSet для передачи через эту функцию, но я не уверен, как чтобы преобразовать этот набор данных, который я создал, в набор данных [Row], который ему нужен.


      val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
      val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
      val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")
      val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
        SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
        SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
        SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
      ).toDS()

Ответы [ 2 ]

1 голос
/ 04 февраля 2020

Вы можете использовать Dataset.toDF()

case class SubsidiariesImpactedStage(t : Timestamp, a: String, b:String, c : Long, d :String )

    val ts1 = Timestamp.valueOf("2019-08-01 00:00:00")
    val ts2 = Timestamp.valueOf("2019-09-20 00:00:00")
    val ts3 = Timestamp.valueOf("2019-11-27 00:00:00")

    val subsidiaries:Dataset[SubsidiariesImpactedStage] = Seq(
      SubsidiariesImpactedStage(ts1,"active","sub_uuid1",32L,"comp_uuid1"),
      SubsidiariesImpactedStage(ts2, "inactive","sub_uuid1",32L, "comp_uuid1"),
      SubsidiariesImpactedStage(ts3, "active", "sub_uuid1", 5L, "latest_comp_uuid1")
    ).toDS()

    val df = subsidiaries.toDF()
    println(df.getClass)
    df.show()

Result- dataframe is Dataset [Row]

class org.apache.spark.sql.Dataset
+-------------------+--------+---------+---+-----------------+
|                  t|       a|        b|  c|                d|
+-------------------+--------+---------+---+-----------------+
|2019-08-01 00:00:00|  active|sub_uuid1| 32|       comp_uuid1|
|2019-09-20 00:00:00|inactive|sub_uuid1| 32|       comp_uuid1|
|2019-11-27 00:00:00|  active|sub_uuid1|  5|latest_comp_uuid1|
+-------------------+--------+---------+---+-----------------+
0 голосов
/ 04 февраля 2020

Dataframe - это DataSet [Row] (> spark 2.0)

, поэтому просто передайте Seq в DF вместо DS перед передачей в качестве аргумента функции.

case class Person(name: String, age: Int)
def fun(d:Dataset[Row])=d.show()
fun(Seq(Person("a", 1)).toDF())
...