уменьшить объем данных, сканируемых Athena при использовании агрегатных функций - PullRequest
3 голосов
/ 26 апреля 2019

Приведенный ниже запрос сканирует 100 МБ данных.

select * from table where column1 = 'val' and partition_id = '20190309';

Однако приведенный ниже запрос сканирует 15 ГБ данных (имеется более 90 разделов)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

Как мне оптимизировать второй запрос для сканирования того же объема данных, что и первый?

Ответы [ 2 ]

6 голосов
/ 27 апреля 2019

Здесь есть две проблемы. Эффективность скалярного подзапроса выше select max(partition_id) from table, а @PiotrFindeisen указывал на динамическую фильтрацию.

Первая проблема заключается в том, что запросы к ключам секционирования таблицы Hive намного сложнее, чем кажутся. Большинство людей подумают, что если вам нужно максимальное значение ключа раздела, вы можете просто выполнить запрос по ключам раздела, но это не сработает, потому что Hive позволяет разделам быть пустыми (а также позволяет не пустые файлы, не содержать строк). В частности, скалярный подзапрос выше select max(partition_id) from table требует, чтобы Presto нашел максимальный раздел, содержащий хотя бы одну строку. Идеальным решением было бы иметь идеальную статистику в Hive, но за исключением того, что движок должен был иметь собственную логику для улья, которая открывает файлы разделов, пока не найдет непустую.

Если вы уверены, что ваш склад не содержит пустых разделов (или если вы согласны с последствиями этого), вы можете заменить скалярный подзапрос одним на скрытую $partitions таблицу "

select * 
from table 
where column1 = 'val' and 
    partition_id = (select max(partition_id) from "table$partitions");

Вторая проблема, на которую указал @PiotrFindeisen, связана с тем, как планируются и выполняются запросы. Большинство людей смотрят на приведенный выше запрос, видят, что механизм, очевидно, должен вычислить значение select max(partition_id) from "table$partitions" во время планирования, включить его в план и затем продолжить оптимизацию. К сожалению, это довольно сложное решение для принятия в общем, поэтому вместо этого механизм просто моделирует это как широковещательное соединение, где одна часть выполнения вычисляет это значение и передает значение остальным работникам. Проблема заключается в том, что остальная часть выполнения не может добавить эту новую информацию в существующую обработку, поэтому она просто сканирует все данные и затем отфильтровывает значения, которые вы пытаетесь пропустить. Идет проект по добавлению динамической фильтрации , но он еще не завершен.

Это означает, что лучшее, что вы можете сделать сегодня, - это выполнить два отдельных запроса: один для получения максимального значения partition_id, а второй - для встроенного значения.

Кстати, скрытая таблица «$ partitions» была добавлена ​​в Presto 0.199 , и мы исправили некоторые незначительные ошибки в 0.201 . Я не уверен, на какой версии основана Athena, но я считаю, что она довольно устарела (текущий выпуск на момент написания этого ответа 309 .

1 голос
/ 25 июня 2019

РЕДАКТИРОВАТЬ : Presto удалила таблицу __internal_partitions__ в их 0.193 версии , поэтому я бы рекомендовал не использовать решение, определенное в разделе Slow aggregation queries for partition keys ниже, в любых производственных системах, поскольку Афина «прозрачно» обновляет версии Presto. В итоге я просто пошел по наивному запросу SELECT max(partition_date) ..., но также использовал тот же трюк с просмотром, описанный в разделе Lack of Dynamic Filtering. Это примерно в 3 раза медленнее, чем при использовании таблицы __internal_partitions__, но, по крайней мере, она не сломается, когда Афина решит обновить свою предварительную версию.

----- Исходное сообщение -----

Так что я нашел довольно хакерский способ сделать это для основанных на дате разделов в больших наборах данных, когда вам нужно только оглянуться на данные нескольких разделов для совпадения на максимуме, однако Обратите внимание, что я не уверен на 100%, насколько хрупким является использование таблицы information_schema.__internal_partitions__.

Как отмечалось выше @Dain, на самом деле есть две проблемы. Во-первых, насколько медленной является агрегация запроса max (partition_date), а во-вторых, отсутствие поддержки Presto динамической фильтрации.

Медленные запросы агрегации для ключей секционирования

Для решения первой проблемы я использую таблицу information_schema.__internal_partitions__, которая позволяет быстро получать агрегации по разделам таблицы без сканирования данных внутри файлов. (Обратите внимание, что partition_value, partition_key и partition_number в приведенных ниже запросах являются именами столбцов таблицы __internal_partitions__ и не связаны со столбцами вашей таблицы)

Если у вас есть только один ключ раздела для вашей таблицы, вы можете сделать что-то вроде:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

Но если у вас несколько ключей разделов, вам понадобится что-то вроде этого:

SELECT max(partition_date) as latest_partition_date from (
  SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
  FROM information_schema.__internal_partitions__
  WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
  GROUP BY partition_number
)
WHERE
  -- ... Filter down by values for e.g. another_partition_key
)

Эти запросы должны выполняться довольно быстро (мой выполняется примерно за 1-2 секунды) без сканирования реальных данных в файлах, но, опять же, я не уверен, есть ли какие-то ошибки с использованием этого подхода.

Отсутствие динамической фильтрации

Я могу смягчить наихудшие последствия второй проблемы для моего конкретного варианта использования, потому что я ожидаю, что всегда будет разделение в течение конечного промежутка времени от текущей даты (например, я могу гарантировать любые данные - проблемы с производством или загрузкой разделов будут исправлены в течение 3 дней). Оказывается, что Athena выполняет некоторую предварительную обработку при использовании функций dateto * presto , поэтому с динамической фильтрацией у нее не возникают проблемы, связанные с использованием подзапроса.

Таким образом, вы можете изменить свой запрос, чтобы ограничить, насколько далеко он будет искать фактический максимум, используя функции datetime, чтобы количество сканируемых данных было ограничено.

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
  -- Insert the partition aggregation query from above here
)
...