Я работаю с Apache Spark для создания паркетных файлов. Я могу разделить их по дате без проблем, но внутренне я не могу выложить данные в правильном порядке.
Порядок, кажется, теряется во время обработки, что означает, что метаданные паркета неверны (в частности, я хочу убедиться, что группы строк паркета отражают отсортированный порядок, чтобы запросы, специфичные для моего варианта использования, могли эффективно фильтроваться через метаданные) ,
Рассмотрим следующий пример:
// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT id, sampleTime, ... ,
toDate(sampleTime) as date FROM hbaseSource")
// Repartion the input set by the date column ( in my source there should be
2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id",
"sampleTime")
sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")
При таком подходе я получаю правильную структуру паркетных перегородок (по дате). И что еще лучше, для каждого раздела дат я вижу один большой файл паркета.
/outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet
Однако, когда я запрашиваю файл, я вижу содержимое не по порядку. Точнее говоря, «не в порядке», похоже, что несколько упорядоченных разделов фрейма данных были объединены в файл.
Метаданные группы строк паркета показывают, что отсортированные поля фактически перекрываются (например, конкретный идентификатор может находиться во многих группах строк):
id: :[min: 54, max: 65012, num_nulls: 0]
sampleTime: :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id: :[min: 827, max: 65470, num_nulls: 0]
sampleTime: :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id: :[min: 1629, max: 61412, num_nulls: 0]
Я хочу, чтобы данные были правильно упорядочены внутри каждого файла, чтобы метаданные мин / макс в каждой группе строк не перекрывались.
Например, это шаблон, который я хочу увидеть:
RG 0: id: :[min: 54, max: 100, num_nulls: 0]
RG 1: id: :[min: 100, max: 200, num_nulls: 0]
... где RG = "группа строк". Если бы я хотел id = 75, запрос мог бы найти его в одной группе строк.
Я пробовал много вариантов приведенного выше кода. Например, с * 1023 и без него (я знаю, что объединение - это плохо, но моя идея состояла в том, чтобы использовать его для предотвращения перемешивания). Я также попробовал sort
вместо sortWithinPartitions
(сортировка должна создать общую упорядоченную сортировку, но приведет к множеству разделов). Например:
val sorted = transformed.repartition($"date").sort("id", "sampleTime")
sorted.write.partitionBy("date").parquet(s"/outputFiles")
Дает мне 200 файлов, что слишком много, и они все еще не отсортированы правильно. Я могу уменьшить количество файлов путем настройки размера в случайном порядке, но я ожидал, что сортировка будет обрабатываться по порядку во время записи (у меня сложилось впечатление, что запись не перемешивает ввод). Порядок, который я вижу, выглядит следующим образом (другие поля для краткости опущены):
+----------+----------------+
|id| sampleTime|
+----------+----------------+
| 56868|1514840220000000|
| 57834|1514785180000000|
| 56868|1514840220000000|
| 57834|1514785180000000|
| 56868|1514840220000000|
Что выглядит как чередующиеся отсортированные разделы. Поэтому я думаю, что repartition
здесь ничего не покупает, а sort
, похоже, не в состоянии сохранить порядок на шаге записи.
Я читал, что то, что я хочу сделать, должно быть возможным. Я даже попробовал подход, изложенный в презентации «Настройка производительности паркета:
«Отсутствующее руководство» Райана Блю (к сожалению, оно находится за платной платформой OReily). Это предполагает использование insertInto
. В этом случае, похоже, спарк использовал старую версию parquet-mr, которая повредила метаданные, и я не уверен, как обновить его.
Я не уверен, что я делаю неправильно. У меня такое ощущение, что я неправильно понимаю, как repartition($"date")
и sort
работают и / или взаимодействуют.
Буду признателен за любые идеи. Извиняюсь за сочинение. :)
редактирование:
Также обратите внимание, что если я сделаю show (n) на transformed.sort("id", "sampleTime")
, данные будут отсортированы правильно. Таким образом, похоже, что проблема возникает на этапе записи. Как отмечалось выше, кажется, что вывод сортировки перемешивается во время записи.