Поскольку никто не может ответить на мою проблему, я хотел бы поделиться подходом, который мы использовали для повышения эффективности этой обработки, комментарии очень приветствуются.
Мы обнаружили (сделайте c. Is не очень ясно), что некоторая информация, хранящаяся в «контрольных точках» Spark в HDFS, представляет собой ряд файлов метаданных, описывающих, когда каждый файл Parquet был написан и насколько он велик:
$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata
w-r--r-- 3 hdfs 68K 2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r-- 3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r-- 3 hdfs 68K 2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...
$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...
Итак, то, что мы сделали, были:
- Создание Spark Streaming Job опроса этой папки
_spark_metadata
. - Мы используем
fileStream
, поскольку он позволяет нам определить используемый фильтр файлов. - Каждая запись в этом потоке является одной из этих JSON строк, которые анализируются для извлечения путь и размер файла.
- Сгруппируйте файлы по родительской папке (которая соответствует каждому разделу Impala), к которой они принадлежат.
- Для каждой папки:
- Чтение кадра данных, загрузка * только 1023 * целевых файлов Parquet (чтобы избежать условий гонки с другими заданиями при записи файлов)
- Подсчитать, сколько блоков записать (используя поле размера в JSON и целевой размер блока)
- Объединить кадр данных с требуемым количеством разделов и записать его обратно в HDFS
- Выполнить DDL
REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
- Наконец, удалите исходные файлы
Мы достигли:
Ограничьте DDL, выполнив один refre sh на раздел и пакет .
Имея настраиваемое время партии и размер блока, мы возможность адаптировать наш продукт к различным сценариям развертывания ios с большими или меньшими наборами данных.
Решение достаточно гибкое, поскольку мы можем назначать больше или меньше ресурсов для задания Spark Streaming ( исполнители, ядра, память и т. д. c.), а также мы можем запустить / остановить его (используя собственную систему контрольных точек).
Мы также изучаем возможность применения некоторых данных перераспределение при выполнении этого процесса, чтобы разделы были как можно ближе к наиболее оптимальному размеру.