Изменение запроса с использованием логического плана Spark Catalyst - PullRequest
0 голосов
/ 12 сентября 2018

Возможно ли добавить / заменить существующее выражение столбца в DataFrame API / SQL с использованием точки расширения.

Пример: предположим, что мы внедрили правило разрешения, которое могло бы проверить проект узел из плана и при проверке на столбец "имя", заменить его с верхним (имя), например.

Возможна ли такая вещь с использованием точек расширения. Примеры, которые у меня есть найдены в основном простые, которые не обрабатывают входные выражения так, как мне нужно.

Пожалуйста, дайте мне знать, если это возможно.

1 Ответ

0 голосов
/ 13 сентября 2018

Да, это возможно.

Давайте возьмем пример. Предположим, мы хотим написать правило, которое проверяет наличие оператора Project, и если проект предназначен для какого-то определенного столбца (скажем, 'column2'), то он умножается на 2.

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._

object DoubleColumn2OptimizationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case p: Project =>
          if (p.projectList.filter(_.name == "column2").size >= 1) {
              val newList = p.projectList.map { case x =>
                if (x.name == "column2") {
                  Alias(Multiply(Literal(2, IntegerType), x), "column2_doubled")()
                } else {
                  x
                }
              }
              p.copy(projectList = newList)
          } else {
              p
          }
    }
}

скажем, у нас есть таблица "table1", которая имеет два столбца - column1, column2.

Без этого правила -

> spark.sql("select column2 from table1 limit 10").collect()
Array([1], [2], [3], [4], [5], [6], [7], [8], [9], [10])

с этим правилом -

> spark.experimental.extraOptimizations =  Seq(DoubleColumn2OptimizationRule)
> spark.sql("select column2 from table1 limit 10").collect()
Array([2], [4], [6], [8], [10], [12], [14], [16], [18], [20])

Также вы можете вызвать объяснение на DataFrame, чтобы проверить план -

> spark.sql("select column2 from table1 limit 10").explain
== Physical Plan ==
CollectLimit 10
+- *(1) LocalLimit 10
   +- *(1) Project [(2 * column2#213) AS column2_doubled#214]
      +- HiveTableScan [column2#213], HiveTableRelation `default`.`table1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [column1#212, column2#213]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...