Я пытаюсь выполнить фильтр isin максимально оптимизированным. Есть ли способ трансляции collList с использованием Scala API?
Редактировать: я не ищу альтернативы, я их знаю, но мне нужно, чтобы мои RelationProviders опускали значения.
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
//collList.size == 200.000
val retTable = df.filter(col("col1").isin(collList: _*))
Список, который я передаю Метод isin содержит до 200.000 уникальных элементов.
Я знаю, что это не лучший вариант, и объединение звучит лучше, но Мне нужно, чтобы эти элементы были вставлены в фильтры , что очень важно при чтении ( мое хранилище - Kudu, но оно также применимо к HDFS + Parquet, базовые данные слишком велики, и запросы обрабатывают около 1% этих данных) , я уже все измерил, и это сэкономило мне время выполнения около 30 минут :). Кроме того, мой метод уже принимает меры, если isin больше, чем 200.000.
Моя проблема в том, что я получаю некоторые предупреждения Spark «задача слишком большая» (~ 8 МБ на задачу), все работает нормально, поэтому не большое дело, но я хочу удалить их, а также оптимизировать.
Я пытался с этим, который ничего не делает, так как я все еще получаю предупреждение (так как широковещательная переменная разрешается в Scala и передается в я думаю, что vargargs):
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
val retTable = df.filter(col("col1").isin(sc.broadcast(collList).value: _*))
А этот, который не компилируется:
val collList = collectedDf.map(_.getAs[String]("col1")).sortWith(_ < _)
val retTable = df.filter(col("col1").isin(sc.broadcast(collList: _*).value))
А этот, который не работает (задача слишком большая, все еще появляется)
val broadcastedList=df.sparkSession.sparkContext.broadcast(collList.map(lit(_).expr))
val filterBroadcasted=In(col("col1").expr, collList.value)
val retTable = df.filter(new Column(filterBroadcasted))
Есть идеи, как транслировать эту переменную? (хаки разрешены). Любая альтернатива isin, которая позволяет нажатие фильтра, также допустима Я видел, что некоторые люди делают это на PySpark, но API не тот.
PS: Изменения в хранилище невозможны, Я знаю разделение (уже разделено, но не этим полем), и это могло бы помочь, но пользовательский ввод абсолютно случайный, и к данным обращаются и меняются многие мои клиенты.