У меня есть подход, но довольно запутанный. Он сбрасывает Java Object и JSON (процесс сериализации бедного человека), десериализует его в объект python, фильтрует и анализирует имена таблиц
import json
def get_tables(query: str):
plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
plan_items = json.loads(plan.toJSON())
for plan_item in plan_items:
if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
yield plan_item['tableIdentifier']['table']
, что дает ['fast_track_gv_nexus', 'buybox_gv_nexus']
, когда я перебираю функцию list(get_tables(query))
Примечание К сожалению, это разрывы для CTE
Пример
with delta as (
select *
group by id
cluster by id
)
select *
from ( select *
FROM
(select *
from dmm
inner join delta on dmm.id = delta.id
)
)
И чтобы решить эту проблему, мне нужно взломатьвокруг через регулярное выражение
import json
import re
def get_tables(query: str):
plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(query)
plan_items = json.loads(plan.toJSON())
plan_string = plan.toString()
cte = re.findall(r"CTE \[(.*?)\]", plan_string)
for plan_item in plan_items:
if plan_item['class'] == 'org.apache.spark.sql.catalyst.analysis.UnresolvedRelation':
tableIdentifier = plan_item['tableIdentifier']
table = plan_item['tableIdentifier']['table']
database = tableIdentifier.get('database', '')
table_name = "{}.{}".format(database, table) if database else table
if table_name not in cte:
yield table_name