Код для сопоставления с вашим шаблоном выключен:
feeds match {
case Right(a) =>
val df = resultFeeds(sc, a)
dbObj.writeToPostgres(df, "lookup.Metrics")
"Data Loaded into the db"
case Left(_) => log.error("skipping lookup.Metrics due to internal error")
}
Кроме того, позвольте мне показать дополнительный подход к приведенному выше коду:
def getFeeds(sqlContext: SparkSession ,Path: String): Either[Throwable, DataFrame] = {
import sc.spark.implicits._
Try(sc.spark.read.parquet(Path)).toEither
}
А затем:
val dfOrFailure = getFeeds(sc, path).map(df => resultFeeds(sc, df))
dfOrFailure match {
case Right(df) => dbObj.writeToPostgres(df, "lookup.Metrics")
case Left(err) => log.error("Failed to fetch feed", err)
}
Обратите внимание, что если resultFeeds
само возвращает Either[Throwable, A]
, вы можете продолжить цепочку выполнения, возможно, более четко с помощью для понимания:
def resultFeeds(sc: SparkContext, a: ???): Either[Throwable, DataFrame] = ???
val finalDf = for {
dfFeed <- getFeeds(sc, path)
res <- resultFeeds(sc, dfFeed)
} yield res
finalDf match {
case Right(df) => dbObj.writeToPostgres(df, "lookup.Metrics")
case Left(err) => log.error("Failed to fetch feed", err)
}