Я новичок в Spark / Scala.Мой начальный RDD имеет тип Records, и расположение записей:
a_key, b_key,c_key,f_name,l_name,address
Теперь мне нужно:
- удалить записи, которые имеют a_key или b_key илиc_key - null / empty
- Мне нужно одновременно обновить счетчик для недействительных записей.
Я пробовал это следующим образом:
sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )
val nullFilteredRecords = records.map{ record =>
if( record.A_KEY.isEmpty ||
record.B_KEY.isEmpty ||
record.C_KEY.isEmpty )
{
recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
}
record
}
.filter( record =>
!record.A_KEY.isEmpty &&
!record.B_KEY.isEmpty &&
!record.C_KEY.isEmpty
)
Однако этот код неэффективен, так как он проходит через весь СДР дважды.Сначала обновите счетчик для недействительных записей, а затем снова удалите недопустимые записи.
Есть ли лучший / эффективный способ сделать это?