У меня одна стадия сбоя задания Spark из-за java.lang.NullPointerException
, брошенного функцией в преобразовании map
.
Моя идея состоит в том, чтобы получить поврежденный объект Sale изнутри map
с помощью Try
типа.Поэтому я намеренно присвоил результат функции переменной saleOption
, чтобы затем выполнить сопоставление с образцом.
К сожалению, моя текущая реализация не работает, и мне нужен совет, как это исправить.Буду благодарен за любые предложения.
Вот начальный метод:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = {
rawSales
.map(sale => sale.id -> sale) // throws NullPointerException
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
}
Вот как я реализовал свою идею:
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = {
rawSales
.map(sale => {
val saleOption: Option[(String, Sale)] = Try(sale.id -> sale).toOption
saleOption match {
case Success(successSale) => successSale
case Failure(e) => throw new IllegalArgumentException(s"Corrupted sale: $rawSale;", e)
}
})
.reduceByKey((sale1, sale2) => if (sale1.timestamp > sale2.timestamp) sale1 else sale2)
}
UPD: Мое намерение состоит в том, чтобы реализовать идею в целях отладки и улучшить мои знания Scala.Я не собираюсь использовать Try
и Exceptions
для управления потоком.