Я обрабатываю данные, где некоторые записи могут быть повреждены.Поэтому я решил исследовать данные и использовал Either
для разделения действительных и недействительных записей.
Я выяснил, как подсчитать количество записей каждого вида и теперь получаю выходные данные для failedCount
и * 1005.* успешно.
Но у меня проблема с распечаткой каждой недействительной (левой) записи о продаже.Что может быть не так с моим подходом?
Я не получаю вывод при печати failedSales
def filterSales(rawSales: RDD[Sale]): RDD[(String, Sale)] = {
val filteredSales = rawSales
.map(sale => {
val saleOption = Try(sale.id -> sale)
saleOption match {
case Success(successSale) => Right(successSale)
case Failure(e) => Left(s"Corrupted sale: $sale;", e)
}
})
val failedCount: Long = filteredSales.filter(_.isLeft).count()
val successCount: Long = filteredSales.filter(_.isRight).count()
println("FAILED SALES COUNT: " + failedCount)
println("SUCCESS SALES COUNT: " + successCount)
// Problem here
val failedSales: RDD[Either.LeftProjection[(String, Throwable), (String, Sale)]] = filteredSales.map(_.left)
println("FAILED SALES: ")
// Doesn't produce any output
failedSales.foreach(println)
}