несоответствие типов с помощью параметров в scala - PullRequest
0 голосов
/ 09 апреля 2020

Я очень новичок в scala и пытаюсь создать функцию с помощью "Option". У меня есть функция, описанная ниже, которая создает фрейм данных, если требуемый файл паркета присутствует в "Path", в противном случае необходимо выдать ошибку:

  def getFeeds(sqlContext,Path: String): Either[Throwable, DataFrame] = {
     import sc.spark.implicits._
    try  {
         Right(sc.spark.read.parquet(Path))
     }
     catch {
      case e: ControlThrowable => throw e
      case e: Throwable => {
        log.error("getFeeds failed while reading the data ${Path}", e)
        Left(e)
      }
    }
  }

У меня есть другая функция (resultFeeds), которая ожидает фрейм данных из вышеприведенного (getFeeds), и я делаю так:

 val Feeds = getFeeds(sc,Path)
    var message = ""
    if (Feeds.isLeft) message = s"${message}Feeds:missing;"

    Feeds match {
        case Right(a) =>
              val df = resultFeeds(sc, a)
              case Left(_) => log.error("skipping lookup.Metrics due to internal error")
              dbObj.writeToPostgres(df, "lookup.Metrics")
             "Data Loaded into the db"
          }
        }

resultFeed сбрасывает фрейм данных в postgres Однако когда я пытаюсь запустить приведенный выше код, выводим ошибку как:

  found   : Unit
[error]     required : TestClass.this.output
               (which expands to)  String
[error]               case Right(a) =>
[error]                             ^
[error]                 not found: value df
[error]                   dbObj.writeToPostgres(df, "lookup.Metrics")

Пожалуйста, помогите мне !!

1 Ответ

2 голосов
/ 09 апреля 2020

Код для сопоставления с вашим шаблоном выключен:

feeds match {
  case Right(a) => 
    val df = resultFeeds(sc, a)
    dbObj.writeToPostgres(df, "lookup.Metrics")
    "Data Loaded into the db"
  case Left(_) => log.error("skipping lookup.Metrics due to internal error")
}

Кроме того, позвольте мне показать дополнительный подход к приведенному выше коду:

def getFeeds(sqlContext: SparkSession ,Path: String): Either[Throwable, DataFrame] = {
    import sc.spark.implicits._
    Try(sc.spark.read.parquet(Path)).toEither
}

А затем:

val dfOrFailure = getFeeds(sc, path).map(df => resultFeeds(sc, df))
dfOrFailure match {
  case Right(df) => dbObj.writeToPostgres(df, "lookup.Metrics")
  case Left(err) => log.error("Failed to fetch feed", err)
}

Обратите внимание, что если resultFeeds само возвращает Either[Throwable, A], вы можете продолжить цепочку выполнения, возможно, более четко с помощью для понимания:

def resultFeeds(sc: SparkContext, a: ???): Either[Throwable, DataFrame] = ???

val finalDf = for {
  dfFeed <- getFeeds(sc, path)
  res    <- resultFeeds(sc, dfFeed)
} yield res

finalDf match {
  case Right(df) => dbObj.writeToPostgres(df, "lookup.Metrics")
  case Left(err) => log.error("Failed to fetch feed", err)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...