Как получить имя таблицы из Spark SQL Query [PySpark]? - PullRequest
7 голосов
/ 25 октября 2019

Чтобы получить имя таблицы из запроса SQL,

select *
from table1 as t1
full outer join table2 as t2
  on t1.id = t2.id

Я нашел решение в Scala Как получить имена таблиц из запроса SQL?

def getTables(query: String): Seq[String] = {
  val logicalPlan = spark.sessionState.sqlParser.parsePlan(query)
  import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
  logicalPlan.collect { case r: UnresolvedRelation => r.tableName }
}

, который дает мне правильные имена таблиц, когда я повторяю последовательность возврата getTables(query).foreach(println)

table1
table2

Каким будет эквивалентный синтаксис для PySpark? Самым близким, с которым я столкнулся, было Как извлечь имя столбца и тип столбца из SQL в pyspark

plan = spark_session._jsparkSession.sessionState().sqlParser().parsePlan(query)
print(f"table: {plan.tableDesc().identifier().table()}")

, который завершается с помощью traceback

Py4JError: An error occurred while calling o78.tableDesc. Trace:
py4j.Py4JException: Method tableDesc([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:835)

Я понимаю, что проблема связана с тем, что мне нужно отфильтровать все элементы плана, которые имеют тип UnresolvedRelation, но я не могу найти эквивалентную запись в python / pyspark

1 Ответ

2 голосов
/ 25 октября 2019

У меня есть подход, но довольно запутанный. Он сбрасывает Java Object и JSON (процесс сериализации бедного человека), десериализует его в объект python, фильтрует и анализирует имена таблиц

import json
def get_tables(query: str):
    plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
    plan_items = json.loads(plan.toJSON())
    for plan_item in plan_items:
        if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
            yield plan_item['tableIdentifier']['table']

, что дает ['fast_track_gv_nexus', 'buybox_gv_nexus'], когда я перебираю функцию list(get_tables(query))

Примечание К сожалению, это разрывы для CTE

Пример

with delta as (
   select *
    group by id
    cluster by id
 )
select   *
  from ( select  *
         FROM
          (select   *
            from dmm
            inner join delta on dmm.id = delta.id
           )
  )

И чтобы решить эту проблему, мне нужно взломатьвокруг через регулярное выражение

import json
import re
def get_tables(query: str):
    plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
    plan_items = json.loads(plan.toJSON())
    plan_string = plan.toString()
    cte = re.findall(r"CTE \[(.*?)\]", plan_string)
    for plan_item in plan_items:
        if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
            tableIdentifier = plan_item['tableIdentifier']
            table =  plan_item['tableIdentifier']['table']   
            database =  tableIdentifier.get('database', '')
            table_name = "{}.{}".format(database, table) if database else table
            if table_name not in cte:
                yield table_name
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...