Как эффективно обновлять таблицы Impala, файлы которых изменяются очень часто - PullRequest
7 голосов
/ 06 февраля 2020

У нас есть решение на основе oop (CDH 5.15), где мы получаем новые файлы в HDFS в некоторых каталогах. Помимо этих каталогов у нас есть 4-5 таблиц Impala (2.1). Процесс записи этих файлов в HDFS - Spark Structured Streaming (2.3.1)

. Сейчас мы выполняем некоторые запросы DDL, как только мы получаем файлы, записанные в HDFS:

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y), используя все ключи для каждого раздела.

В настоящее время этот DDL занимает слишком много времени, и они ставятся в очередь в нашей системе, что ухудшает доступность данных системы.

Итак, мой вопрос: Есть ли способ сделать объединение данных более эффективным?

Мы рассмотрели:

  • Использование ALTER TABLE .. RECOVER PARTITONS, но согласно документация , обновляются только новые разделы.

  • Пытался использовать REFRESH .. PARTITON ... с несколькими разделами одновременно, но синтаксис оператора не позволяет этого сделать.

  • Попытка пакетной обработки запросов, но диски Hive JDB C не поддерживают пакетную обработку запросов.

  • Попробуем выполнить эти обновления в параллельно, учитывая, что система уже занята?

  • Любой другой способ вы знаете?

Спасибо!

Виктор

Примечание. Мы знаем, какие разделы нужно обновить, используя события HDFS, поскольку при Spark Structured Streaming мы не знаем точно, когда записываются файлы.

Примечание # 2: Кроме того, файлы, записанные в HDFS иногда бывают небольшими, поэтому было бы здорово, если бы можно было объединить эти файлы одновременно.

1 Ответ

0 голосов
/ 27 февраля 2020

Поскольку никто не может ответить на мою проблему, я хотел бы поделиться подходом, который мы использовали для повышения эффективности этой обработки, комментарии очень приветствуются.

Мы обнаружили (сделайте c. Is не очень ясно), что некоторая информация, хранящаяся в «контрольных точках» Spark в HDFS, представляет собой ряд файлов метаданных, описывающих, когда каждый файл Parquet был написан и насколько он велик:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

Итак, то, что мы сделали, были:

  • Создание Spark Streaming Job опроса этой папки _spark_metadata.
    • Мы используем fileStream, поскольку он позволяет нам определить используемый фильтр файлов.
    • Каждая запись в этом потоке является одной из этих JSON строк, которые анализируются для извлечения путь и размер файла.
  • Сгруппируйте файлы по родительской папке (которая соответствует каждому разделу Impala), к которой они принадлежат.
  • Для каждой папки:
    • Чтение кадра данных, загрузка * только 1023 * целевых файлов Parquet (чтобы избежать условий гонки с другими заданиями при записи файлов)
    • Подсчитать, сколько блоков записать (используя поле размера в JSON и целевой размер блока)
    • Объединить кадр данных с требуемым количеством разделов и записать его обратно в HDFS
    • Выполнить DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • Наконец, удалите исходные файлы

Мы достигли:

  • Ограничьте DDL, выполнив один refre sh на раздел и пакет .

  • Имея настраиваемое время партии и размер блока, мы возможность адаптировать наш продукт к различным сценариям развертывания ios с большими или меньшими наборами данных.

  • Решение достаточно гибкое, поскольку мы можем назначать больше или меньше ресурсов для задания Spark Streaming ( исполнители, ядра, память и т. д. c.), а также мы можем запустить / остановить его (используя собственную систему контрольных точек).

  • Мы также изучаем возможность применения некоторых данных перераспределение при выполнении этого процесса, чтобы разделы были как можно ближе к наиболее оптимальному размеру.

...