Эффективный способ чтения паркетных файлов между диапазонами дат в Azure Databricks - PullRequest
0 голосов
/ 28 февраля 2019

Я хотел бы знать, является ли приведенный ниже псевдокод эффективным методом чтения нескольких файлов паркета между диапазонами дат, хранящимися в озере данных Azure, из PySpark (блоки данных Azure).Примечание: файлы паркета не разбиты по дате.

Я использую соглашение uat / EntityName / 2019/01/01 / EntityName_2019_01_01_HHMMSS.parquet для хранения данных в ADL, как предложено в книге Большие данные Натана Марца с небольшиммодификация (с использованием 2019 вместо года = 2019).

Чтение всех данных с использованием * подстановочного знака:

df = spark.read.parquet(uat/EntityName/*/*/*/*)

Добавление столбца FileTimestamp, который извлекает метку времени из EntityName_2019_01_01_HHMMSS.parquet с использованием строковой операции и преобразования вTimestampType ()

df.withColumn(add timestamp column)

Используйте фильтр для получения соответствующих данных:

start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)

По сути, я использую PySpark для имитации аккуратного синтаксиса, доступного в U-SQL:

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");

1 Ответ

0 голосов
/ 03 марта 2019

Правильный способ разделения ваших данных - это использовать для данных форму года = 2019, месяц = ​​01 и т. Д.

Когда вы запрашиваете эти данные с помощью фильтра, такого как:

df.filter(df.year >= myYear)

Тогда Spark будет читать только соответствующие папки.

Очень важно, чтобы имя столбца фильтрации отображалось точно в имени папки.Обратите внимание, что когда вы пишете секционированные данные с помощью Spark (например, по году, месяцу, дню), он не будет записывать столбцы секционирования в файл паркета.Они вместо этого выведены из пути.Это означает, что ваш dataframe потребует их при записи.Они также будут возвращаться в виде столбцов при чтении из многораздельных источников.

Если вы не можете изменить структуру папок, вы всегда можете вручную уменьшить количество папок, которые Spark может прочитать с помощью регулярных выражений или Glob - эта статья должна предоставить больше контекста Spark SQL-запросы к секционированным данным с использованием диапазонов дат .Но очевидно, что это более сложная и ручная инструкция.

ОБНОВЛЕНИЕ : еще один пример Можно ли прочитать несколько файлов в кадре данных Spark из S3, пропуская несуществующие?

Также из «Spark - Полное руководство: обработка больших данных стала проще» Билла Чамберса:

Секционирование - это инструмент, который позволяет вам контролировать, какие данные хранятся (и где) какты пишешь это.Когда вы записываете файл в многораздельный каталог (или таблицу), вы в основном кодируете столбец как папку.Это позволяет вам пропускать большое количество данных, когда вы собираетесь читать их позже, что позволяет вам читать только те данные, которые имеют отношение к вашей проблеме, вместо того, чтобы сканировать полный набор данных....

Это, вероятно, самая низкая оптимизация, которую вы можете использовать, когда у вас есть таблица, по которой читатели часто фильтруют перед манипулированием.Например, дата особенно характерна для раздела, потому что в нисходящем направлении мы часто хотим просматривать только данные за предыдущую неделю (вместо сканирования всего списка записей).

...