Scala spark - считать нулевое значение в столбцах данных с использованием аккумулятора - PullRequest
1 голос
/ 20 марта 2020

У меня есть эта функция:

def countNullValueColumn(df: DataFrame): Array[(String, Long)] = 
   df.columns
      .map(x => (x, df.filter(df(x).isNull || df(x) === "" || df(x).isNan).count))

Я пытаюсь использовать val counter = sc.longAccumulator вместо функции подсчета кадров, но безуспешно.

Попытки, которые я предпринял, были :

df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1)} (x, counter.value)})
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1); (x, counter.value)} })

К сожалению, ничего из этого не работает, поскольку он не возвращает правильный тип (Array[(String, Long)]).

У кого-нибудь есть идеи или предложения? Заранее спасибо

Ps Я не знаю, является ли использование аккумулятора более эффективным, чем подсчет, но я просто хотел бы попробовать.

Редактировать: Должен ли я вместо этого использовать foreach map чтобы не иметь неправильное значение в аккумуляторе? Поскольку map - это преобразование, а foreach - это действие

Edit2: в соответствии с предложением @DNA я изменил map на foreach внутри своего кода.

Edit3 : Хорошо, теперь проблема стала пытаться создать Array[(String, Long)]. Я пробовал это, но оператор :+ не работает.

val counter = session.sparkContext.longAccumulator
val res: Array[(String, Long)] = Array()
df.columns
    .foreach(x => res :+ (x, df.filter{ df(x).isNull || df(x) === "" || df(x).isNaN {counter.add(1); counter.value}}))

У кого-нибудь есть идеи или предложения?

1 Ответ

1 голос
/ 20 марта 2020

В документации обсуждается эта топи c:

Аккумуляторы не меняют ленивую оценочную модель Spark. Если они обновляются в операции над RDD, их значение обновляется только после того, как RDD вычисляется как часть действия. Следовательно, обновления аккумулятора не гарантированно выполняются при выполнении ленивого преобразования, такого как map (). Следующий фрагмент кода демонстрирует это свойство:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

Существует дополнительная проблема с получением надежных результатов от аккумуляторов:

Для обновлений аккумулятора, выполняемых только внутри действий, Spark гарантирует, что обновление каждой задачи в накопителе будет применено только один раз, т.е. перезапущенные задачи не будут обновлять значение. При преобразованиях пользователи должны знать, что каждое обновление задачи может применяться несколько раз, если задачи или этапы задания повторно выполняются.

Таким образом, по обеим этим причинам следует отдавать предпочтение таким действиям, как foreach через преобразования, такие как map, при использовании такого аккумулятора.

Также обратите внимание, что вы запускаете foreach по массиву столбцов, а не по самому DataFrame - тогда вы запускаете filter преобразование неоднократно на вашем DataFrame. Так что в этом случае foreach вообще не является действием Spark, это просто метод для Array.

Так что вам, вероятно, понадобится map над массивом df.columns (так что вы получите массив для возврата из вашей функции), затем действие foreach над фактическим DataFrame (для выполнения подсчета).

Вот один из способов сделать это:

df.columns.map(col => {
  val acc = sc.accumulator(0)
  df.foreach(row => {
    val v = row.getAs[Any](col)
    if (v == null || v == "") acc += 1  // NaN left as an exercise
    }
  )
  (col, acc.value)
})

Но обратите внимание что это всегда будет неэффективно, потому что мы должны передать DataFrame для каждого столбца. Вероятно, было бы более эффективно подсчитать все столбцы за один проход (сгенерировать кортеж или карту отсчетов для каждой строки), а затем объединить подсчеты, используя reduce или fold или аналогичные, вместо использования счетчиков.

...