Я новый разработчик в Spark Scala и хочу улучшить свой код с помощью широковещательного соединения.
Как я понимаю, широковещательное соединение может оптимизировать код, если у нас большой DataFrame с небольшимодин.Это именно тот случай для меня.У меня есть первый DF (tab1 в моем примере), который содержит более 3 миллиардов данных, которые мне нужно объединить со вторым, содержащим только 900 данных.
Вот мой sql запрос:
SELECT tab1.id1, regexp_extract(tab2.emp_name, ".*?(\\d+)\\)$", 1) AS city,
topo_2g3g.emp_id AS emp_id, tab1.emp_type
FROM table1 tab1
INNER JOIN table2 tab2
ON (tab1.emp_type = tab2.emp_type AND tab1.start = tab2.code)
А вот моя попытка использовать широковещательное соединение:
val tab1 = df1.filter(""" id > 100 """).as("table1")
val tab2 = df2.filter(""" id > 100 """).as("table2")
val result = tab1.join(
broadcast(tab2)
, col("tab1.emp_type") === col("tab2.emp_type") && col("tab1.start") === col("tab2.code")
, "inner")
Проблема в том, что этот способ вообще не оптимизирован.Я имею в виду, что он содержит ВСЕ столбцы для двух таблиц, хотя мне не нужны все эти столбцы.Мне просто нужно 3 из них и последний (с регулярным выражением на нем), что совсем не оптимально.Как будто мы сначала создаем очень большую таблицу, а затем уменьшаем ее до маленькой таблицы.В SQL мы получили небольшую таблицу.
Итак, после этого шага:
- Мне нужно использовать withColumn для генерации нового столбца (с регулярным выражением)
- Примените метод фильтра, чтобы выбрать 3 colmuns, которые я. Хотя я получил их НЕМЕДЛЕННО в sql (без фильтра, я имею в виду).
Можете ли вы помочь мне, пожалуйста, оптимизировать мой код и мойзапрос ?Заранее спасибо