Spark и scala: объедините функцию, если - PullRequest
0 голосов
/ 17 апреля 2019

Я работал над ETL для Spark с использованием Scala, в этом ETL я хочу добавить 3 аргумента, чтобы соответственно определить repartionBy, partitionBy, orderBy для записи моего фрейма данных в хранилище. Однако эти аргументы должны быть необязательными.

Я действительно не хочу писать ужасное if...else утверждение, которое бы принимало любую комбинацию из 8 возможностей.
У меня есть функция:

def writer(
  outputFormat: String,
  outputFile: String,
  outputMode: SaveMode,
  outputRepartionBy: String,
  outputParitionBy: String,
  outputOrderBy: String,
  dryRun: Boolean = false
)(df: DataFrame): Unit = {

        if (dryRun){
            df.show(500, false)
        }else{
            if (outputFormat == "parquet" || outputFormat == "orc" ) {
                df.write.format(outputFormat).mode(outputMode).save( outputFile )
            } else {
                df.write.format(outputFormat).save(outputFile)
            }
        }
    }

Можно ли сделать что-то вроде:

df.write
.if( outputRepartionBy != null ){ repartitionby( outputRepartionBy ) }
.format( outputFormat )
.mode(outputMode)
.save( outputFile )

Было бы правильным способом связать функцию, если условие выполняется, и если нет, то есть ли такие возможности в scala / spark?

Редактировать: я на Spark 2.3.1 с Scala 2.11.12

Ответы [ 2 ]

2 голосов
/ 17 апреля 2019

Я использую эту запись в блоге для достижения желаемой логики, она выглядит намного лучше и очень аккуратно.

sealed class ConditionalApplicative[T] private (val value: T) { // if condition class wrapper
   class ElseApplicative(value: T, elseCondition: Boolean) extends ConditionalApplicative[T](value) {
   // else condition class wrapper extends ConditionalApplicative to avoid double wrapping 
   // in case: $if(condition1) { .. }. $if(condition2) { .. }
      def $else(f: T => T): T = if(elseCondition) f(value) else value
   }

   // if method for external scope condition
   def $if(condition: => Boolean)(f: T => T): ElseApplicative =
      if(condition) new ElseApplicative(f(value), false) 
      else new ElseApplicative(value, true)

   // if method for internal scope condition
   def $if(condition: T => Boolean) (f: T => T): ElseApplicative =
      if(condition(value)) new ElseApplicative(f(value), false) 
      else new ElseApplicative(value, true) 
}

object ConditionalApplicative { // Companion object for using ConditionalApplicative[T] generic
   implicit def lift2ConditionalApplicative[T](any: T): ConditionalApplicative[T] =
      new ConditionalApplicative(any)

   implicit def else2T[T](els: ConditionalApplicative[T]#ElseApplicative): T =
      els.value
}  

, импортировав это в свой метод, я могу сделать что-товот так:

 def writer(outputFormat: String, outputFile: String, outputMode: SaveMode, outputRepartionBy: String,outputParitionBy :String, outputOrderBy :String, dryRun: Boolean = false)(df: DataFrame): Unit = {
        import etl.tool.ConditionalApplicative._
        if (dryRun){
            df.show(500, false)
        }else{
            if (outputFormat == "parquet" | outputFormat == "orc" ) {
                df
                    .$if(outputOrderBy != null){
                        _.orderBy(col(outputOrderBy))
                    }.$if(outputRepartionBy != null){
                        _.repartition(col(outputRepartionBy))
                    }.write.format(outputFormat).mode(outputMode)
                    .$if(outputParitionBy != null){
                        _.partitionBy(outputParitionBy)
                    }.save( outputFile )

            } else {
                df.write.format(outputFormat).save(outputFile)
            }
        }
    }

Полученный код говорит сам за себя.Хотя мое понимание основной логики ограничено.

2 голосов
/ 17 апреля 2019

вы можете сделать что-то вроде

val temp=df.write.format(outputformat)
val writer =if ( outputRepartionBy != null )  temp.repartitionby(outputRepartitionBy)  else temp
writer.mode(outputMode).save(outputFile)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...