Spark Альтернатива SubQuery с использованием той же таблицы - PullRequest
0 голосов
/ 09 февраля 2019

Этот пост о том, как неэффективно спарк присоединение таблицы с подзапросом с использованием той же таблицы.Использование Spark 2.2 и Scala.

У меня есть таблица (fieldcnts) со следующей схемой:

|key |name |value |cnt |keycnt |minkeycnt|type |
|A   |f1   |123   |3   |100    |30       |-1   |
|A   |f1   |234   |5   |100    |30       |-1   |
|A   |f2   |345   |50  |225    |75       |-1   |
|B   |f1   |456   |40  |100    |30       |-1   |
...

Эта таблица получена из кэшированной таблицы со следующим запросом (Q1):

SELECT key, 'f1' AS name, f1 AS value, COUNT(*) AS cnt,
       first(keycnt) AS keycnt, first(keymincnt) AS keymincnt
FROM table1
GROUP BY key, f1
UNION ALL
SELECT key, 'f2' AS name, f2 AS value, COUNT(*) AS cnt,
       first(keycnt) AS keycnt, first(keymincnt) AS keymincnt
FROM table1
GROUP BY key, f2
UNION ALL
...
(simiarly for f3, f4 and f5)

Схема таблицы 1:

|key|f1|f2|f3|f4|f5|keycnt|keymincnt|

Таким образом, таблица fieldcnts на этом этапе имеет счетчики каждого (key, f *) - кортежа.В настоящее время у меня есть keycnt и keymincnt (процент от keycnt), хранящиеся избыточно в каждой строке, чтобы избежать дополнительного объединения и вычисления, хотя это может измениться позже, если я получу более высокую производительность с объединением и меньшим набором данных.

Тип поля по умолчанию -1.Мне нужно впоследствии установить это на одно из 4 значений в зависимости от некоторых условий.Я делаю это с помощью следующего запроса (Q2):

SELECT R.key, R.name, R.value, R.cnt
    CASE 
        WHEN ${cond1} THEN -2
        WHEN ${cond2} THEN -3
        WHEN ${cond3} THEN -4
        ELSE -1
    END AS type
FROM fieldcnts R
JOIN (SELECT key, name, COUNT(*) AS numrecs, SUM(CASE WHEN cnt >= keymincnt THEN cnt ELSE 0 END) AS fieldsum
      FROM fieldcnts
      GROUP BY key, name) AS S
ON R.key = S.key AND R.name = S.name

Где каждый $ {condn} является условием, включающим cnt, keymincnt, keycnt, fieldsum и numrecs (например, (A B) (возможно, довольно проприетарно, на всякий случай!)

Так что это работает, но во время настройки я заметил, что из-за того, что подзапрос в Q2 использует ту же таблицу, что и внешний запрос, он вызываетподзапрос для чтения с диска (сканирование паркета). Это проблема, потому что это происходит для каждого запроса в рамках запроса Q1 UNION ALL!

Я впервые заметил это при вычислении таблицы 1. Набор файлов паркета читается в, он выбирает подмножество полей, а затем я отфильтровываю все строки, в которых keycnt

SELECT R.key, R.f1, R.f2, R.f3, R.f4, R.f5, S.cnt AS keycnt, 0.33*S.cnt AS keymincnt
FROM tbl R
JOIN (SELECT key, COUNT(*) AS keycnt
      FROM tbl T
      GROUP BY key
      HAVING keycnt >= ${minCnt}) AS S
ON R.key = S.key

(в действительности tbl использовался задолго до этого, поэтому я получаю его кешируемым в данный момент).Глядя на группу обеспечения доступности баз данных, я отчетливо видел, как внешняя таблица читается из кеша, а внутренняя таблица читается с диска! На самом деле я решил эту проблему, используя Windowing:

val dfSQL = spark.sql("SELECT key, f1, f2, f3, f4, f5 FROM tbl")
    .withColumn("keycnt", count("*").over(Window.partitionBy($"key")))
    .withColumn("keymincnt", $"key"*0.33)
    .filter($"keycnt" >= minCnt)

Намного лучше. Нет SortMergeJoin или дискав этом случае прочитайте.

Теперь вернемся к Q2: это похоже на запрос table1, но гораздо сложнее.Я не мог придумать схему, использующую Windowing (все еще вид новичка с искрой).Есть ли способ использовать Windowing здесь?В качестве альтернативы есть ли способ предотвратить чтение искры с диска для подзапроса, использующего ту же таблицу?

Спасибо!

Обновление Возвращаясь к этому.Мой коллега нашел другое решение: использование mapPartitions, которое намного лучше подходит моей проблеме.

Я сохранил этот запрос, который распределяет по разделам по ключу, именно то, что я хочу:

SELECT R.key, R.f1, R.f2, R.f3, R.f4, R.f5, S.cnt AS keycnt, 0.33*S.cnt AS keymincnt
FROM tbl R
JOIN (SELECT key, COUNT(*) AS keycnt
    FROM tbl T
    GROUP BY key
    HAVING keycnt >= ${minCnt}) AS S
ON R.key = S.key

Затем я написал остальную часть алгоритма в функции scala mapPartitions.Гораздо чище, намного быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...