Получить примеры строк, которые удаляются фильтром из фрейма данных spark - PullRequest
0 голосов
/ 05 июля 2018

Предположим, у меня есть искровой фрейм данных df с некоторыми столбцами (id, ...) и строка sqlFilter с фильтром SQL, например. "id is not null". Я хочу отфильтровать кадр данных df на основе sqlFilter, т.е.

val filtered = df.filter(sqlFilter)

Теперь я хочу получить список из 10 идентификаторов из df, которые были удалены фильтром.

В настоящее время я использую соединение "leftanti" для достижения этой цели, т.е.

val examples = df.select("id").join(filtered.select("id"), Seq("id"), "leftanti")
                 .take(10)
                 .map(row => Option(row.get(0)) match { case None => "null" case Some(x) => x.toString})

Однако, это действительно медленно. Я предполагаю, что это может быть реализовано быстрее, потому что спарк должен иметь только список для каждого раздела и добавить идентификатор в список, когда фильтр удаляет строку, а список содержит менее 10 элементов. После того, как действие после Фильтр завершается, спарк должен собрать все списки с разделов, пока у него не будет 10 идентификаторов.

Я хотел использовать аккумуляторы, как описано здесь , но я потерпел неудачу, потому что я не мог найти, как разобрать и использовать sqlFilter.

Кто-нибудь знает, как я могу улучшить производительность?

Обновление Рамеш Махарджан предложил в комментариях инвертировать SQL-запрос, т.е.

df.filter(s"NOT ($filterString)")
          .select(key)
          .take(10)
          .map(row => Option(row.get(0)) match { case None => "null" case Some(x) => x.toString})

Это действительно улучшает производительность, но не на 100% эквивалентно. Если есть несколько строк с одним и тем же идентификатором, идентификатор будет в конечном итоге в примерах, если одна строка будет удалена из-за фильтра. С присоединением leftantit это не заканчивается в примерах, потому что id все еще в filtered. Тем не менее, это нормально для меня.

Мне все еще интересно, можно ли создать список "на лету" с аккумуляторами или чем-то подобным.

Обновление 2

Другая проблема с инвертированием фильтра - это логическое значение UNKNOWN в SQL, поскольку NOT UNKNWON = UNKNOWN, т. Е. NOT(null <> 1) <=> UNKNOWN, и, следовательно, эта строка не отображается ни в фильтрованном кадре данных, ни в инвертированном кадре данных.

1 Ответ

0 голосов
/ 15 июля 2018

Вы можете использовать собственный аккумулятор (потому что longAccumulator вам не поможет, так как все идентификаторы будут нулевыми); и вы должны сформулировать свой оператор фильтра как функцию:

Предположим, у вас есть фрейм данных:

+----+--------+
|  id|    name|
+----+--------+
|   1|record 1|
|null|record 2|
|   3|record 3|
+----+--------+

Тогда вы можете сделать:

import org.apache.spark.util.AccumulatorV2

class RowAccumulator(var value: Seq[Row]) extends AccumulatorV2[Row, Seq[Row]] {
  def this() = this(Seq.empty[Row])
  override def isZero: Boolean = value.isEmpty
  override def copy(): AccumulatorV2[Row, Seq[Row]] = new RowAccumulator(value)
  override def reset(): Unit = value = Seq.empty[Row]
  override def add(v: Row): Unit = value = value :+ v
  override def merge(other: AccumulatorV2[Row, Seq[Row]]): Unit = value = value ++ other.value
}

val filteredAccum = new RowAccumulator()
ss.sparkContext.register(filteredAccum, "Filter Accum")

val filterIdIsNotNull = (r:Row) => {
  if(r.isNullAt(r.fieldIndex("id"))) {
    filteredAccum.add(r)
    false
  } else {
    true
  }}

df
  .filter(filterIdIsNotNull)
  .show()

println(filteredAccum.value)

т

+---+--------+
| id|    name|
+---+--------+
|  1|record 1|
|  3|record 3|
+---+--------+

List([null,record 2])

Но лично я бы не стал этого делать, я бы предпочел сделать то, что вы уже предлагали:

val dfWithFilter = df
  .withColumn("keep",expr("id is not null"))
  .cache() // check whether caching is feasibly

// show 10 records which we do not keep
dfWithFilter.filter(!$"keep").drop($"keep").show(10) // or use take(10)

+----+--------+
|  id|    name|
+----+--------+
|null|record 2|
+----+--------+

// rows that we keep
val filteredDf = dfWithFilter.filter($"keep").drop($"keep")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...