Этот пост о том, как неэффективно спарк присоединение таблицы с подзапросом с использованием той же таблицы.Использование Spark 2.2 и Scala.
У меня есть таблица (fieldcnts) со следующей схемой:
|key |name |value |cnt |keycnt |minkeycnt|type |
|A |f1 |123 |3 |100 |30 |-1 |
|A |f1 |234 |5 |100 |30 |-1 |
|A |f2 |345 |50 |225 |75 |-1 |
|B |f1 |456 |40 |100 |30 |-1 |
...
Эта таблица получена из кэшированной таблицы со следующим запросом (Q1):
SELECT key, 'f1' AS name, f1 AS value, COUNT(*) AS cnt,
first(keycnt) AS keycnt, first(keymincnt) AS keymincnt
FROM table1
GROUP BY key, f1
UNION ALL
SELECT key, 'f2' AS name, f2 AS value, COUNT(*) AS cnt,
first(keycnt) AS keycnt, first(keymincnt) AS keymincnt
FROM table1
GROUP BY key, f2
UNION ALL
...
(simiarly for f3, f4 and f5)
Схема таблицы 1:
|key|f1|f2|f3|f4|f5|keycnt|keymincnt|
Таким образом, таблица fieldcnts на этом этапе имеет счетчики каждого (key, f *) - кортежа.В настоящее время у меня есть keycnt и keymincnt (процент от keycnt), хранящиеся избыточно в каждой строке, чтобы избежать дополнительного объединения и вычисления, хотя это может измениться позже, если я получу более высокую производительность с объединением и меньшим набором данных.
Тип поля по умолчанию -1.Мне нужно впоследствии установить это на одно из 4 значений в зависимости от некоторых условий.Я делаю это с помощью следующего запроса (Q2):
SELECT R.key, R.name, R.value, R.cnt
CASE
WHEN ${cond1} THEN -2
WHEN ${cond2} THEN -3
WHEN ${cond3} THEN -4
ELSE -1
END AS type
FROM fieldcnts R
JOIN (SELECT key, name, COUNT(*) AS numrecs, SUM(CASE WHEN cnt >= keymincnt THEN cnt ELSE 0 END) AS fieldsum
FROM fieldcnts
GROUP BY key, name) AS S
ON R.key = S.key AND R.name = S.name
Где каждый $ {condn} является условием, включающим cnt, keymincnt, keycnt, fieldsum и numrecs (например, (A B) (возможно, довольно проприетарно, на всякий случай!)
Так что это работает, но во время настройки я заметил, что из-за того, что подзапрос в Q2 использует ту же таблицу, что и внешний запрос, он вызываетподзапрос для чтения с диска (сканирование паркета). Это проблема, потому что это происходит для каждого запроса в рамках запроса Q1 UNION ALL!
Я впервые заметил это при вычислении таблицы 1. Набор файлов паркета читается в, он выбирает подмножество полей, а затем я отфильтровываю все строки, в которых keycnt
SELECT R.key, R.f1, R.f2, R.f3, R.f4, R.f5, S.cnt AS keycnt, 0.33*S.cnt AS keymincnt
FROM tbl R
JOIN (SELECT key, COUNT(*) AS keycnt
FROM tbl T
GROUP BY key
HAVING keycnt >= ${minCnt}) AS S
ON R.key = S.key
(в действительности tbl использовался задолго до этого, поэтому я получаю его кешируемым в данный момент).Глядя на группу обеспечения доступности баз данных, я отчетливо видел, как внешняя таблица читается из кеша, а внутренняя таблица читается с диска! На самом деле я решил эту проблему, используя Windowing:
val dfSQL = spark.sql("SELECT key, f1, f2, f3, f4, f5 FROM tbl")
.withColumn("keycnt", count("*").over(Window.partitionBy($"key")))
.withColumn("keymincnt", $"key"*0.33)
.filter($"keycnt" >= minCnt)
Намного лучше. Нет SortMergeJoin или дискав этом случае прочитайте.
Теперь вернемся к Q2: это похоже на запрос table1, но гораздо сложнее.Я не мог придумать схему, использующую Windowing (все еще вид новичка с искрой).Есть ли способ использовать Windowing здесь?В качестве альтернативы есть ли способ предотвратить чтение искры с диска для подзапроса, использующего ту же таблицу?
Спасибо!
Обновление Возвращаясь к этому.Мой коллега нашел другое решение: использование mapPartitions, которое намного лучше подходит моей проблеме.
Я сохранил этот запрос, который распределяет по разделам по ключу, именно то, что я хочу:
SELECT R.key, R.f1, R.f2, R.f3, R.f4, R.f5, S.cnt AS keycnt, 0.33*S.cnt AS keymincnt
FROM tbl R
JOIN (SELECT key, COUNT(*) AS keycnt
FROM tbl T
GROUP BY key
HAVING keycnt >= ${minCnt}) AS S
ON R.key = S.key
Затем я написал остальную часть алгоритма в функции scala mapPartitions.Гораздо чище, намного быстрее.