Фильтрация наборов данных Spark в одном столбце с помощью лямбда-функции - PullRequest
0 голосов
/ 24 октября 2018

Я пытаюсь ускорить фильтрацию на , избегая десериализации каждой строки , так как мой набор данных содержит большое количество строк.Я хочу разрешить спарк фильтровать на основе одного столбца (например, g col1), что позволит избежать десериализации всей строки.Кроме того, я забочусь о возвращении Dataset[MyEntry], аналогично вводу, а не Dataset[Row].

// The three filteres were broadcasted as a Map 
// with the key being the name of the filter and value being 
// the filter itself
// dataSet is Dataset[MyEntry]

case class MyEntry(col1: Int, col2: Int, col3: Int)

val allFilterNames = Array("one","two","three")

dataSet.filter(value => {
          def foo(value: MyEntry): Boolean = {
            var found = false
            for (entry <- allFilterNames) {
              if (broadcastVariable.value.get(entry).get.contains(value.col1)) {
                return true
              }
            }
            found
          }
          foo(value)
})

Контекст: Я пытаюсь отфильтровать столбец набора данных Spark на основенабор широковещательных структур членства (например, фильтр Блума), содержащихся в карте (ключ: имя фильтра, значение: фактический фильтр).Я хочу проверить, существует ли значение столбца в (например, 3) широковещательных фильтрах.Если значение существует, тогда вся строка сохраняется.В противном случае строка фильтруется.Я использовал лямбда-функцию, и она работает.Тем не менее, это связано с тем, что вся строка будет десериализована каждый раз, когда я выполняю проверку.Я хочу избежать этих накладных расходов, применяя одну и ту же функцию вместо одного столбца (т. Е. Разрешить десериализации для одного столбца вместо целой строки).Приведенный выше код хорошо работает при использовании лямбда-функции.Решение , которое он ищет, должно быть на дешевле во времени по сравнению с вышеупомянутым решением (т. Е. Передать весь набор данных только один раз)

...