Как взять данные из нескольких паркетных файлов одновременно? - PullRequest
0 голосов
/ 24 декабря 2018

Мне нужна ваша помощь, потому что я новичок в Spark Framework.

У меня есть папка с большим количеством паркетных файлов.Название этих файлов имеет одинаковый формат: DD-MM-YYYY.Например: '01-10-2018', '02-10-2018', '03-10-2018' и т. Д.

Мое приложение имеет два входных параметра: dateFrom и dateTo.

Когда я пытаюсь использовать следующийкод приложения зависает.Похоже, приложение сканирует все файлы в папке.

val mf = spark.read.parquet("/PATH_TO_THE_FOLDER/*")
         .filter($"DATE".between(dateFrom + " 00:00:00", dateTo + " 23:59:59"))
mf.show()

Мне нужно использовать пул данных за период как можно быстрее.

Я думаю, было бы здорово разделить период на дни.и затем читайте файлы отдельно, присоединяйте их так:

val mf1 = spark.read.parquet("/PATH_TO_THE_FOLDER/01-10-2018");
val mf2 = spark.read.parquet("/PATH_TO_THE_FOLDER/02-10-2018");

val final = mf1.union(mf2).distinct();

dateFrom и dateTo являются динамическими, поэтому я не знаю, как правильно организовать код прямо сейчас.Пожалуйста, помогите!


@ y2k-shubham Я пытался проверить следующий код, но он вызывает ошибку:

import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}

val dateFrom = DateTime.parse("2018-10-01")
val dateTo = DateTime.parse("2018-10-05")

def getDaysInBetween(from: DateTime, to: DateTime): Int = Days.daysBetween(from, to).getDays

def getDatesInBetween(from: DateTime, to: DateTime): Seq[DateTime] = {
    val days = getDaysInBetween(from, to)
    (0 to days).map(day => from.plusDays(day).withTimeAtStartOfDay())
}

val datesInBetween: Seq[DateTime] = getDatesInBetween(dateFrom, dateTo)

val unionDf: DataFrame = datesInBetween.foldLeft(spark.emptyDataFrame) { (intermediateDf: DataFrame, date: DateTime) =>
    intermediateDf.union(spark.read.parquet("PATH" + date.toString("yyyy-MM-dd") + "/*.parquet"))
}
unionDf.show()

ОШИБКА :

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 0 columns and the second table has 20 columns;

Кажется, что intermediateDf DateFrame при запуске пуст.Как решить проблему?

Ответы [ 2 ]

0 голосов
/ 25 декабря 2018
import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.spark.sql.{DataFrame, SparkSession}

val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")

def dateRangeInclusive(start: String, end: String): Iterator[LocalDate] = {
  val startDate = LocalDate.parse(start, formatter)
  val endDate = LocalDate.parse(end, formatter)
  Iterator.iterate(startDate)(_.plusDays(1))
    .takeWhile(d => d.isBefore(endDate) || d.isEqual(endDate))
}

val spark = SparkSession.builder().getOrCreate()
val data: DataFrame = dateRangeInclusive("2018-10-01", "2018-10-05")
  .map(d => spark.read.parquet(s"/path/to/directory/${formatter.format(d)}"))
  .reduce(_ union _)

Я также предлагаю использовать собственный API JSR 310 (часть Java SE начиная с Java 8), а не Joda-Time, поскольку он более современный и не требует внешних зависимостей.Обратите внимание, что сначала создать последовательность путей и выполнить map + lower для этого варианта использования, вероятно, проще, чем более общее решение на основе foldLeft.

Кроме того, вы можете использовать reduceOption, а затемполучить Option[DataFrame], если введенный диапазон дат пуст.Кроме того, если некоторые входные каталоги / файлы могут отсутствовать, вы должны выполнить проверку перед вызовом spark.read.parquet.Если ваши данные хранятся в HDFS, вам, вероятно, следует использовать API Hadoop FS:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val spark = SparkSession.builder().getOrCreate()
val fs = FileSystem.get(new Configuration(spark.sparkContext.hadoopConfiguration))
val data: Option[DataFrame] = dateRangeInclusive("2018-10-01", "2018-10-05")
  .map(d => s"/path/to/directory/${formatter.format(d)}")
  .filter(p => fs.exists(new Path(p)))
  .map(spark.read.parquet(_))
  .reduceOption(_ union _)
0 голосов
/ 24 декабря 2018

Хотя я не тестировал этот фрагмент кода, он должен работать (возможно, небольшие изменения?)

import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}

// return no of days between two dates
def getDaysInBetween(from: DateTime, to: DateTime): Int = Days.daysBetween(from, to).getDays

// return sequence of dates between two dates
def getDatesInBetween(from: DateTime, to: DateTime): Seq[DateTime] = {
  val days = getDaysInBetween(from, to)
  (0 to days).map(day => from.plusDays(day).withTimeAtStartOfDay())
}

// read parquet data of given date-range from given path
// (you might want to pass SparkSession in a different manner)
def readDataForDateRange(path: String, from: DateTime, to: DateTime)(implicit spark: SparkSession): DataFrame = {
  // get date-range sequence
  val datesInBetween: Seq[DateTime] = getDatesInBetween(from, to)

  // read data of from-date (needed because schema of all DataFrames should be same for union)
  val fromDateDf: DataFrame = spark.read.parquet(path + "/" + datesInBetween.head.toString("yyyy-MM-dd"))

  // read and union remaining dataframes (functionally)
  val unionDf: DataFrame = datesInBetween.tail.foldLeft(fromDateDf) { (intermediateDf: DataFrame, date: DateTime) =>
    intermediateDf.union(spark.read.parquet(path + "/" + date.toString("yyyy-MM-dd")))
  }

  // return union-df
  unionDf
}

Ссылка: Как вычислить дату интервала 'n' дней в функциональном стиле?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...