добавить несколько столбцов в scala фрейм данных Spark на основе одного столбца - PullRequest
0 голосов
/ 04 мая 2020

У меня есть scala spark DataFrame, который я хотел бы разделить на HDFS на основе нескольких производных одного из его столбцов. Все выводы основаны на анализе указанного исходного столбца. После синтаксического анализа я получаю объект, который затем могу использовать для сбора всех моих производных значений. Я намерен добавить столбцы к DataFrame, представляющим ea. производный раздел. Количество производных разделов - динамическое c (две последовательности за пределами def - это stati c для простоты). На данный момент лучшее решение, которое я смог придумать, включает в себя withColumn:

import org.apache.spark.sql
import sql.{DataFrame, DataFrameWriter, Row}
import sql.functions.{udf, col}

import java.time
import time.LocalDate
import time.Instant.ofEpochMilli
import time.ZoneId.of

lazy val dateComponents = Seq[LocalDate => Int](_.getYear, _.getDayOfYear, _.getMonthValue, _.getDayOfMonth)
lazy val partitionCols = Seq("YEAR", "DOY", "MONTH", "DOM")

def partitionBy(df0: DataFrame, feederColumn: String): DataFrameWriter[Row] = {
  val df = dateComponents.zip(partitionCols).foldLeft(df0) { case (tempDf, (dateComp, partLabel) =>
    /* UDF instead of spark-native functions because SNFs, like `from_unixtime`, do
       not readily accommodate time zones outside the current system one */
    val f = udf {
      millisFromEpoch: Long => 
        val date = ofEpochMilli(millisFromEpoch).atZone(of("+5:30")).toLocalDate
        dateComp(date)
    }
    df0.withColumn(partLabel, f(col(feederColumn)))
  }
  df.write.partitionBy(partitionCols:_*)
}

Как вы, вероятно, можете сказать, UDF анализирует одни и те же значения "фидера" столько раз, сколько они получены. столбцы. Более детально, date вычисляется для каждого значения столько раз, сколько существует производных разделов (хотя это одно и то же значение ea. Time), в то время как единственная оценка, которая требует, чтобы выполнялась для каждого раздела, это dateComp(date). Это не оптимально. Есть ли способ проанализировать только один раз для всех производных столбцов (я думаю, промежуточный столбец, содержащий значения date)? Или даже лучше: кормить все производные значения, когда я читаю исходные поля (я думаю, что w / df.map, но после долгих попыток не вышло с чем-то работоспособным из-за динамики c характер мощности производных разделов - единственный способ, который я вижу, - это сказать, что параметр map возвращает кортеж. Но кортежи не создаются динамически, если мы не используем Shapeless, с которым я не знаком, но при необходимости ) .

...