Создать Dataframe из вывода функции - PullRequest
0 голосов
/ 01 октября 2018

В приведенном ниже коде scala используется функция для подсчета количества дней между вводом пользователем даты start и end.Затем он выполняет итерацию / чтение файлов aws s3, которые находятся в этом временном диапазоне.

def getS3Data(s3Loc: String): DataFrame = {
println(s"Reading Avro from ${s3Loc}")
val cosimDf = spark.read.format("com.databricks.spark.avro")
  .load(s3Loc)
  cosimDf
}

val daysCount = Days.daysBetween(start,end).getDays()
(0 until daysCount).map(start.plusDays(_)).foreach{x => {var myDir ="s3://AWS.Bucket/parsed/" + x.toString("yyyyMMdd") +"/"

val myDf = getS3Data(myDir)
}}

У меня проблемы с созданием фрейма данных, который представляет собой набор всех файлов, через которые итерировала вышеупомянутая функция.Я думаю, что возможно использование .collect или применение вывода к искушаемому может работать, но, похоже, не могу понять, как.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 01 октября 2018

Вы можете создавать кадры данных в цикле

Если вам известен тип записей, хранящихся в файлах avro, просто замените foreach на foldLeft и установите начальное значение как пустой блок данных данного типа.

case class YourS3RowCaseClass(...) // replace with your type

val daysCount = Days.daysBetween(start,end).getDays()
val finalDf = (0 until daysCount).map(start.plusDays(_))
  .foldLeft(spark.createDataFrame(Seq.empty[YourS3RowCaseClass])){
    case (df, x) => 
        var myDir ="s3://AWS.Bucket/parsed/" + x.toString("yyyyMMdd") +"/"
        val myDf = getS3Data(myDir)
        df.union(myDf)
  }

Если вам нужно вывести тип из кадра данных, он может быть немного сложнее, чем

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...