Сортировка по ключам разделов во время INSERT INTO (Parquet) TABLE с Impala - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть задание ETL, где я хочу добавить данные из CSV-файла в таблицу Impala.В настоящее время я делаю это путем создания временной внешней таблицы .csv с новыми данными (в формате .csv.lzo), после чего она вставляется в основную таблицу.

Используемый мной запрос выглядит следующим образом:

INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT) AS yr,
    CAST(extract(ts, "month") AS TINYINT) AS mth
FROM csv_table

, где main_table определяется следующим образом (несколько усеченных столбцов):

CREATE TABLE IF NOT EXISTS main_table (
    tid             INT,
    s1              VARCHAR,
    s2              VARCHAR,
    status          TINYINT,
    ts              TIMESTAMP,
    n1              DOUBLE,
    n2              DOUBLE,
    p               DECIMAL(3,2),
    mins            SMALLINT,
    temp            DOUBLE
)
PARTITIONED BY (yr SMALLINT, mth TINYINT)
STORED AS PARQUET

Данные имеют порядок нескольких ГБ (55 миллионов строк с приблизительно 30 столбцами).), и это займет более часа, чтобы бежать.Мне было любопытно, почему это так (поскольку это кажется довольно длинным для чего-то, что по сути является операцией добавления), и я столкнулся с этим в плане запроса:

F01:PLAN FRAGMENT [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))] hosts=2 instances=2
|  Per-Host Resources: mem-estimate=1.01GB mem-reservation=12.00MB thread-reservation=1
WRITE TO HDFS [default.main_table, OVERWRITE=false, PARTITION-KEYS=(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  partitions=unavailable
|  mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
|
02:SORT
|  order by: CAST(extract(ts, 'year') AS SMALLINT) ASC NULLS LAST, CAST(extract(ts, 'month') AS TINYINT) ASC NULLS LAST
|  materialized: CAST(extract(ts, 'year') AS SMALLINT), CAST(extract(ts, 'month') AS TINYINT)
|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
|  tuple-ids=1 row-size=1.29KB cardinality=unavailable
|  in pipelines: 02(GETNEXT), 00(OPEN)
|
01:EXCHANGE [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  mem-estimate=2.57MB mem-reservation=0B thread-reservation=0
|  tuple-ids=0 row-size=1.28KB cardinality=unavailable
|  in pipelines: 00(GETNEXT)
|

Очевидно, большую часть времении ресурсы расходуются на сортировку ключей разделов:

Operator       #Hosts  Avg Time  Max Time   #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail                                                                                          
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
02:SORT             2    17m16s    30m50s  55.05M          -1  25.60 GB       12.00 MB                                                                                                  
01:EXCHANGE         2   9s493ms  12s822ms  55.05M          -1  26.98 MB        2.90 MB  HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT)) 
00:SCAN HDFS        2  51s958ms     1m10s  55.05M          -1  76.06 MB      704.00 MB  default.csv_table

Почему Импала должна это делать?Есть ли способ разделить таблицу без необходимости сортировки по ключам разделов или способ ускорить ее в моем случае, когда у всего файла .csv, который я пытаюсь добавить, есть только 1 или 2 ключа раздела?

РЕДАКТИРОВАТЬ: Оказывается, это наиболее вероятно, потому что я использую формат файла Parquet.Мой вопрос все еще применим, хотя: есть ли способ ускорить сортировку, когда я знаю, что сортировка практически не требуется?

Для сравнения, операция типа SELECT COUNT(*) FROM csv_table WHERE extract(ts, "year") = 2018 AND extract(ts, "month") = 1 занимает около 2-3 минут, тогда какORDER BY (как сделано во время вставки) занимает более часа.В этом примере были только ключи (2018,1) и (2018,2).

1 Ответ

0 голосов
/ 25 февраля 2019

Impala выполняет сортировку, потому что вы используете динамическое разбиение.Особенно с таблицами с вычисляемой статистикой impala не очень хорошо справляется с динамическим разбиением.Советую использовать улей в случае динамических разделов.Если вы не собираетесь использовать куст, я советую:

  1. Выполнять вычисления статистики в таблице csv перед каждой вставкой в ​​операторы.
  2. Если первый шаг не работает, используйте статический раздел для нескольких возможных разделов и запустите динамический раздел, выходящий за пределы возможных диапазонов.Например;если есть одна опция года и месяца:
INSERT 
INTO TABLE main_table
PARTITION(yr=2019, mth=2)
SELECT
    *
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)=2019 and CAST(extract(ts, "month") AS TINYINT)=2;  
INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT),
    CAST(extract(ts, "month") AS TINYINT)
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)!=2019 and CAST(extract(ts, "month") AS TINYINT)!=2;

Эти операторы сокращают набор, который будет иметь дело с динамическим разделом.И ожидается уменьшение общего времени, потраченного.

...