Scala spark: создание списка набора данных из операции с картой набора данных - PullRequest
0 голосов
/ 04 апреля 2020

Предположим, я хочу создать 2 типа метри c: metricA или metricB после преобразования другого набора данных. Если определенное условие выполнено, оно будет генерировать как metricA, так и B, если условие не будет выполнено, генерировать только metri c A. Идея состоит в том, чтобы записать 2 метрики в 2 разных пути (pathA, pathB).

Подход, который я выбрал, состоял в том, чтобы создать набор данных GeneralMetri c, а затем, основываясь на том, что внутри, писать по разным путям, но, очевидно, это не сработало, поскольку сопоставление с образцом внутри набора данных не сработало

val s: SparkSession = SparkSession
    .builder()
    .appName("Metric")
    .getOrCreate()
import s.implicits._

case class original (id : Int, units: List[Double])

case class MetricA (a: Int, b: Int, filtered_unit: List[Double])
case class MetricB (a: Int, filtered_unit: List[Double])
case class GeneralMetric(metricA: MetricA, metricB: Option[MetricB])

def createA: MetricA = {
    MetricA(1, 1, List(1.0, 2.0)
}

def createB: MetricB = {
    MetricB(1, List(10.0, 20.0)
}
def create (isBoth: Boolean): GeneralMetric = {
    if(isBoth) {
       val a: MetricA = createA()
       val b: MetricB = createB()
       GeneralMetric(a, Some(b))
    }
    else {
       val a: MetricA = createA()
       GeneralMetric(a, None)
    }
}

val originalDF: DataFrame

val result : Dataset[GeneralMetric] =
                 originalDF.as[original]
                 .map { r =>
                      if(r.id == 21) create(true)
                      else create(false)
                 }

val pathA: String = "s3://pathA"
val pathB: String = "s3://pathB"

//below code obviously wouldn't work
result.map(x => {
    case (metricA, Some(metricB)) => {
      metricA.write.parquet(pathA)
      metricB.write.parquet(pathB)
    }
    case (metricA, None) => metricA.write.parquet(pathA)

  })

Следующим подходом, о котором я думал, было поместить результаты в список [GeneralMetric], где GeneralMetri c представляет собой запечатанный след , расширенный как MetricA, так и MetricB, но Как я могу сделать преобразование набора данных вернуть список GeneralMetri c.

Любые идеи будут полезны

1 Ответ

1 голос
/ 04 апреля 2020

Почему бы

result.map({
    case (metricA, Some(metricB)) =>
      metricA.write.parquet(pathA)
      metricB.write.parquet(pathB)
    case (metricA, None) => metricA.write.parquet(pathA)

  })

не работать в вашем случае? Это просто проблема синтаксиса?


Также: кажется, что вы отправляете метрики независимо (или, по крайней мере, в этом примере). Вы можете смоделировать его как:

sealed trait Metric {
  def write
}
case class MetricA (a: Int, b: Int, filtered_unit: List[Double]) extends Metric {
  override def write: Unit = ???
}
case class MetricB (a: Int, filtered_unit: List[Double]) extends Metric {
  override def write: Unit = ???
}

и позвонить

implicit val enc: Encoder[Metric] = Encoders.kryo[Metric]
val result: Dataset[Metric] =
    originalDF.as[original]
      .flatMap { r =>
        if (r.id == 21) createA :: createB :: Nil
        else createA :: Nil
      }
result.foreach(metric.write.parquet())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...