У меня есть scala spark DataFrame
, который я хотел бы разделить на HDFS на основе нескольких производных одного из его столбцов. Все выводы основаны на анализе указанного исходного столбца. После синтаксического анализа я получаю объект, который затем могу использовать для сбора всех моих производных значений. Я намерен добавить столбцы к DataFrame
, представляющим ea. производный раздел. Количество производных разделов - динамическое c (две последовательности за пределами def
- это stati c для простоты). На данный момент лучшее решение, которое я смог придумать, включает в себя withColumn
:
import org.apache.spark.sql
import sql.{DataFrame, DataFrameWriter, Row}
import sql.functions.{udf, col}
import java.time
import time.LocalDate
import time.Instant.ofEpochMilli
import time.ZoneId.of
lazy val dateComponents = Seq[LocalDate => Int](_.getYear, _.getDayOfYear, _.getMonthValue, _.getDayOfMonth)
lazy val partitionCols = Seq("YEAR", "DOY", "MONTH", "DOM")
def partitionBy(df0: DataFrame, feederColumn: String): DataFrameWriter[Row] = {
val df = dateComponents.zip(partitionCols).foldLeft(df0) { case (tempDf, (dateComp, partLabel) =>
/* UDF instead of spark-native functions because SNFs, like `from_unixtime`, do
not readily accommodate time zones outside the current system one */
val f = udf {
millisFromEpoch: Long =>
val date = ofEpochMilli(millisFromEpoch).atZone(of("+5:30")).toLocalDate
dateComp(date)
}
df0.withColumn(partLabel, f(col(feederColumn)))
}
df.write.partitionBy(partitionCols:_*)
}
Как вы, вероятно, можете сказать, UDF анализирует одни и те же значения "фидера" столько раз, сколько они получены. столбцы. Более детально, date
вычисляется для каждого значения столько раз, сколько существует производных разделов (хотя это одно и то же значение ea. Time), в то время как единственная оценка, которая требует, чтобы выполнялась для каждого раздела, это dateComp(date)
. Это не оптимально. Есть ли способ проанализировать только один раз для всех производных столбцов (я думаю, промежуточный столбец, содержащий значения date
)? Или даже лучше: кормить все производные значения, когда я читаю исходные поля (я думаю, что w / df.map
, но после долгих попыток не вышло с чем-то работоспособным из-за динамики c характер мощности производных разделов - единственный способ, который я вижу, - это сказать, что параметр map
возвращает кортеж. Но кортежи не создаются динамически, если мы не используем Shapeless, с которым я не знаком, но при необходимости ) .