Scala Искра в списке вещания - PullRequest
0 голосов
/ 09 апреля 2020

Я пытаюсь выполнить фильтр isin максимально оптимизированным. Есть ли способ трансляции collList с использованием Scala API?

Редактировать: я не ищу альтернативы, я их знаю, но мне нужно, чтобы мои RelationProviders опускали значения.

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  //collList.size == 200.000
  val retTable = df.filter(col("col1").isin(collList: _*))

Список, который я передаю Метод isin содержит до 200.000 уникальных элементов.

Я знаю, что это не лучший вариант, и объединение звучит лучше, но Мне нужно, чтобы эти элементы были вставлены в фильтры , что очень важно при чтении ( мое хранилище - Kudu, но оно также применимо к HDFS + Parquet, базовые данные слишком велики, и запросы обрабатывают около 1% этих данных) , я уже все измерил, и это сэкономило мне время выполнения около 30 минут :). Кроме того, мой метод уже принимает меры, если isin больше, чем 200.000.

Моя проблема в том, что я получаю некоторые предупреждения Spark «задача слишком большая» (~ 8 МБ на задачу), все работает нормально, поэтому не большое дело, но я хочу удалить их, а также оптимизировать.

Я пытался с этим, который ничего не делает, так как я все еще получаю предупреждение (так как широковещательная переменная разрешается в Scala и передается в я думаю, что vargargs):

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  val retTable = df.filter(col("col1").isin(sc.broadcast(collList).value: _*))

А этот, который не компилируется:

  val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
  val retTable = df.filter(col("col1").isin(sc.broadcast(collList: _*).value))

А этот, который не работает (задача слишком большая, все еще появляется)

  val broadcastedList=df.sparkSession.sparkContext.broadcast(collList.map(lit(_).expr))
  val filterBroadcasted=In(col("col1").expr, collList.value)
  val retTable = df.filter(new Column(filterBroadcasted))

Есть идеи, как транслировать эту переменную? (хаки разрешены). Любая альтернатива isin, которая позволяет нажатие фильтра, также допустима Я видел, что некоторые люди делают это на PySpark, но API не тот.

PS: Изменения в хранилище невозможны, Я знаю разделение (уже разделено, но не этим полем), и это могло бы помочь, но пользовательский ввод абсолютно случайный, и к данным обращаются и меняются многие мои клиенты.

Ответы [ 2 ]

1 голос
/ 09 апреля 2020

Я бы выбрал в этом случае широкополосное преобразование данных со значением ha sh вместо широковещательной переменной.

Подготовьте фрейм данных со списком сбора collectedDf("col1"), который вы хотите отфильтровать с помощью isin, а затем используйте объединение между двумя кадрами данных, чтобы отфильтровать совпадения строк.

enter image description here

Я думаю, что это будет более эффективно, чем isin, поскольку у вас есть 200k записей для фильтрации. spark.sql.autobroadcastjointhreshhold - это свойство, которое необходимо установить с соответствующим размером (по умолчанию 10 МБ). AFAIK вы можете использовать до 200 МБ или 3oomb в зависимости от ваших требований.

см. Это BHJ Объяснение того, как это работает

Дальнейшее чтение Spark эффективно фильтрует записи от больших кадр данных, который существует в небольшом кадре данных

0 голосов
/ 09 апреля 2020

Я просто уйду с большими задачами, так как я использую это только дважды (но экономит много времени) в своей программе, и я могу себе это позволить, но если кому-то еще это сильно нужно ... ну, похоже, это путь.

Наилучшие альтернативы, которые я обнаружил, - это pushdown для больших массивов:

  1. Измените ваш поставщик отношений, чтобы он передавал большие списки при нажатии в фильтрах. оставьте некоторые трансляции sh, но хорошо ... если ваше приложение не транслируется, это не должно быть проблемой, или вы можете сохранить в глобальном списке и очистить их через некоторое время
  2. Добавьте фильтр в Spark (я написал что-то на https://issues.apache.org/jira/browse/SPARK-31417), который позволяет транслировать pushdown до вашего поставщика отношений . Вам нужно будет добавить свой пользовательский предикат, затем реализовать свой пользовательский «Pushdown» (вы можете сделать это, добавив новое правило), а затем переписать свой поставщик RDD / Relation, чтобы он мог использовать тот факт, что переменная транслируется.
  3. Используйте coalesce (X) после чтения, чтобы уменьшить количество задач, иногда может работать, зависит от того, как реализован RelationProvider / RDD .
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...