Spark Scala: как фильтровать СДР и обновлять счетчик одновременно - PullRequest
0 голосов
/ 18 декабря 2018

Я новичок в Spark / Scala.Мой начальный RDD имеет тип Records, и расположение записей:

a_key, b_key,c_key,f_name,l_name,address

Теперь мне нужно:

  • удалить записи, которые имеют a_key или b_key илиc_key - null / empty
  • Мне нужно одновременно обновить счетчик для недействительных записей.

Я пробовал это следующим образом:

sc.register( recordStatsAccumulator, "Stat accumulator for " + filename )

val nullFilteredRecords = records.map{ record =>

  if( record.A_KEY.isEmpty ||
    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty )
  {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
  }

  record

 }
 .filter( record =>
    !record.A_KEY.isEmpty &&
      !record.B_KEY.isEmpty &&
      !record.C_KEY.isEmpty
  )

Однако этот код неэффективен, так как он проходит через весь СДР дважды.Сначала обновите счетчик для недействительных записей, а затем снова удалите недопустимые записи.

Есть ли лучший / эффективный способ сделать это?

1 Ответ

0 голосов
/ 18 декабря 2018

Я думаю, вы можете объединить две операции за один шаг.Как это:

val nullFilteredRecords = records.filter { record =>
  if( record.A_KEY.isEmpty ||
    record.B_KEY.isEmpty ||
    record.C_KEY.isEmpty ) {
    recordStatsAccumulator.add( ValidationLoggingUtil.INVALID )
  }
  !record.A_KEY.isEmpty && !record.B_KEY.isEmpty && !record.C_KEY.isEmpty
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...