Невозможно сгладить массив DataFrames - PullRequest
0 голосов
/ 14 апреля 2020

У меня есть массив DataFrame s, который я получаю, используя randomSplit() таким образом:

val folds = df.randomSplit(Array.fill(5)(1.0/5)) //Array[Dataset[Row]]

Я буду перебирать сгибы, используя для l oop, где я буду помещать i -й элемент в folds и хранить его отдельно. Тогда я буду использовать все остальные в качестве другого DataFrame, как в моем коде ниже:

    val df = spark.read.format("csv").load("xyz")
    val folds = df.randomSplit(Array.fill(5)(1.0/5))

    for (i <- folds.indices) {
        var ts = folds
        val testSet = ts(i)
        ts = ts.drop(i)

        var trainSet = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], testSet.schema)

        for (j <- ts.indices) {
            trainSet = trainSet.union(ts(j))
        }
    }

Хотя это и служит моей цели, я также пробовал другой подход, в котором я бы все еще разделял folds на ts и testSet, а затем используйте функцию flatten для оставшихся внутри ts, чтобы создать еще один DataFrame, используя что-то вроде этого:

    val df = spark.read.format("csv").load("xyz")
    val folds = df.randomSplit(Array.fill(5)(1.0/5))

    for (i <- folds.indices) {
        var ts = folds
        val testSet = ts(i)
        ts = ts.drop(i)

        var trainSet = ts.flatten
    }

Но при инициализации trainSet строка, я получаю сообщение об ошибке: No Implicits Found for parameter asTrav: Dataset[Row] => Traversable[U_]. Я также сделал import spark.implicits._ после инициализации SparkSession.

Моя конечная цель с созданием trainSet после flatten состоит в том, чтобы получить DataFrame, созданный после объединения (объединения) с другим Dataset[Row] внутри ts. Я не уверен, где я ошибаюсь.

Я использую Spark 2.4.5 с Scala 2.11.12

РЕДАКТИРОВАТЬ 1: Добавлен способ чтения Dataframe

Ответы [ 2 ]

0 голосов
/ 01 мая 2020

Я заменил это for l oop простым уменьшением:

val trainSet = ts.reduce((a,b) => a.union(b))
0 голосов
/ 14 апреля 2020

Я не уверен, что вы намерены здесь сделать, но вместо использования изменяемых переменных и выравнивания вы можете сделать рекурсивную итерацию следующим образом:


  val folds = df.randomSplit(Array.fill(5)(1.0/5))   //Array[Dataset[Row]]
  val testSet = spark.createDataFrame(Seq.empty)
  val trainSet = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], testSet.schema)

  go(folds, Array.empty)

  def go(items: Array[Dataset[Row]], result: Array[Dataset[Row]]): Array[Dataset[Row]] = items match {
    case arr @ Array(_, _*) =>
      val res = arr.map { t =>
        trainSet.union(t)
      }
      go(arr.tail, result ++ res)
    case Array() => result
  }

Как я видел сценарий использования testSet, там его нельзя использовать в теле метода

...