Определите пользовательский интерфейс Spark Scala с параметром Option в качестве входного параметра - PullRequest
0 голосов
/ 16 января 2019

Написал следующий UDF с целью заставить его обрабатывать случай, когда один параметр не определен. Ниже приведен код:

val addTimeFromCols: UserDefinedFunction = udf((year: String, month: String, day: String, hour: String) => {
      Option(hour) match {
        case None    => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(defaultHour)
        case Some(x) => (List(year, month, day).mkString(DASH_SEP)).concat(SPACE).concat(hour)
      }
    })

 def addTimestampFromFileCols(): DataFrame = df
  .withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), col(COLUMN_HOUR)).cast(TimestampType))

Моя цель состоит в том, чтобы сделать эту функцию применимой для всех случаев использования (фрейм данных, имеющий столбцы HOUR и другие, у которых в этом случае не будет этого столбца, я определяю значение по умолчанию. К сожалению, это не работает, когда я тестирую снова В кадре данных без столбца я получаю следующую ошибку:

cannot resolve '`HOUR`' given input columns

Любая идея, как это исправить, пожалуйста

1 Ответ

0 голосов
/ 16 января 2019

Если столбец не существует, то вы должны указать значение по умолчанию с помощью функции lit (), иначе он выдаст ошибку. Следующее сработало у меня

scala> defaultHour
res77: String = 00

scala> :paste
// Entering paste mode (ctrl-D to finish)

def addTimestampFromFileCols(df:DataFrame) =
{
val hr = if( df.columns.contains("hour") ) col(COLUMN_HOUR) else lit(defaultHour)
df.withColumn(COLUMN_TS, addTimeFromCols(col(COLUMN_YEAR), col(COLUMN_MONTH), col(COLUMN_DAY), hr).cast(TimestampType))
}

// Exiting paste mode, now interpreting.

addTimestampFromFileCols: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

scala> 

+ ve case

scala> val df = Seq(("2019","01","10","09")).toDF("year","month","day","hour")
df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 2 more fields]

scala> addTimestampFromFileCols(df).show(false)
+----+-----+---+----+-------------------+
|year|month|day|hour|tstamp             |
+----+-----+---+----+-------------------+
|2019|01   |10 |09  |2019-01-10 09:00:00|
+----+-----+---+----+-------------------+

-все кейсы

scala> val df = Seq(("2019","01","10")).toDF("year","month","day")
df: org.apache.spark.sql.DataFrame = [year: string, month: string ... 1 more field]

scala> addTimestampFromFileCols(df).show(false)
+----+-----+---+-------------------+
|year|month|day|tstamp             |
+----+-----+---+-------------------+
|2019|01   |10 |2019-01-10 00:00:00|
+----+-----+---+-------------------+

scala>
...