Оптимизация чтения данных для запуска из Azure blob - PullRequest
2 голосов
/ 25 февраля 2020

У нас есть данные, хранящиеся для таблицы в Azure хранилище BLOB-объектов, которое действует как озеро данных. Данные поступают каждые 30 минут, образуя временные разделы, как показано ниже в UT C

<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=00
<Container>/<TableName>/y=2020/m=02/d=20/h=01/min=30
<Container>/<TableName>/y=2020/m=02/d=20/h=02/min=00 and so on. 

Формат файла, используемый для захвата данных, равен или c, а разделы данных во временном разделе имеют одинаковый размер.

У нас есть сценарий использования для сбора данных на уровне дня в IST с использованием Spark (V 2.3) для обработки. Учитывая, что данные находятся в UT C, а вариант использования - для обработки данных в IST (+5,30 UT C), в общей сложности необходимо 48 временных разделов от / h = 18 / min = 30 (предыдущий день) до / ч = 18 / мин = 00 (следующий день). У нас есть два варианта:

Опция 1 Создание фреймов данных для каждого временного раздела и их объединение

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=18/min=30)
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/h=19/min=00) 
..
df48 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/h=18/min=00) ..
df = df.union(df1) 
df = df.union(df2) 
..
df = df.union(df48)

В результате для 48 разделов будет получен целый день. данные в формате df.

Вариант 2 Захват данных на уровне дня и применение условия фильтра в течение часа.

df1 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=20/).filter(h>=19 or (h=18 and min=30))
df2 = SparkRead(<Container>/<TableName>/y=2020/m=02/d=21/).filter(h<=17 or (h=18 and min=00))
df = df1.union(df2)

Как только данные загружены в память, время, необходимое для обработки, остается тем же, т. Е. ~ 5 мин. Время, необходимое для загрузки данных, является узким местом. Вариант 1 занимает 30 минут, а вариант 2 - 2 минуты для загрузки в память.

В нескольких блогах мы видели, что Analyzer просматривает весь предыдущий кадр данных каждый раз, когда вызывается union. Таким образом, для 48 союзов он сканирует 1 + 2 + 3 + 47 = 1128 раз. Это причина экспоненциального снижения производительности? Что делает Analyzer, можно ли его отключить? Чтобы сделать функцию чтения обобщенной c для данных с временным разделением в хранилище файлов, есть ли какие-либо предложения или рекомендации для принятия?

Ответы [ 2 ]

0 голосов
/ 04 марта 2020

Подождите минутку ... разве файлы не имеют какого-то соглашения об именах? Я имею в виду, если файлы названы по сути одинаковыми, за исключением часов и минут.

Примерно так: filter (h> = 19 или (h = 18 и min = 30))

Просто перебирайте файлы, используя подстановочный знак, и объединяйте все в один фрейм данных.

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
    .withColumn("file_name", input_file_name())

Если схема отсутствует в самом файле или если она по какой-то причине является неполной, вы можете создать ее и переопределить содержимое этого файла.

val customSchema = StructType(Array(
    StructField("field1", StringType, true),
    StructField("field2", StringType, true),
    StructField("field3", StringType, true),
    etc.

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("sep", "|")
    .load("mnt/<Container>/<TableName>/y=2020/m=02/d=20/h*.gz")
    .withColumn("file_name", input_file_name())

Попробуйте это и посмотрим, как вы ладите.

0 голосов
/ 26 февраля 2020

Объединение фреймов данных приводит к тому, что Analyzer перебирает все предыдущие фреймы данных. В первую очередь это делается для того, чтобы вывести схему из файлов или c и выдать ошибку в случае несоответствия. Мы наблюдали большое количество файловых операций во время каждого объединения.

Опция 1 Поскольку в каждом временном разделе содержится> 200 файловых разделов, общее количество проходов, выполненных анализатором, составило 1 + 2 + .. + 47 = 1128. Это число, умноженное на 200, представляет собой число операций закрытия схемы анализа файлов = 225 600. Это было основной причиной того, что вариант 1 приводил к 30 минутам.

Вариант 2 Вариант 2 выполнял ту же операцию, но на двух больших фреймах данных. Один из предыдущего дня (с 18.30 до 23.30), а другой на следующий день (с 00.00 до 18.00). В результате было выполнено 22 + 26 = 48x200 = 9600 операций открытия-анализа файлов при закрытии схемы.

Чтобы смягчить это, мы указали схему, а не полагались на механизм вывода схемы Spark. Как вариант 1, так и вариант 2 были выполнены менее чем за 2 минуты после указания схемы.

Обучение. Использование механизма логического вывода схемы искры является дорогостоящим, если происходит значительное объединение / слияние наборов данных. В первую очередь из-за большого количества файловых операций. Это может быть оптимизацией в Spark, чтобы избежать повторного обхода фрейма данных, если схема уже была выведена в предыдущей операции. Пожалуйста, укажите схему, чтобы смягчить это.

...