Не могу записать заказанные данные на паркет в свече - PullRequest
0 голосов
/ 04 сентября 2018

Я работаю с Apache Spark для создания паркетных файлов. Я могу разделить их по дате без проблем, но внутренне я не могу выложить данные в правильном порядке.

Порядок, кажется, теряется во время обработки, что означает, что метаданные паркета неверны (в частности, я хочу убедиться, что группы строк паркета отражают отсортированный порядок, чтобы запросы, специфичные для моего варианта использования, могли эффективно фильтроваться через метаданные) ,

Рассмотрим следующий пример:

// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT  id, sampleTime, ... , 
toDate(sampleTime) as date FROM hbaseSource")

// Repartion the input set by the date column ( in my source there should be 
2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id", 
"sampleTime")

sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")

При таком подходе я получаю правильную структуру паркетных перегородок (по дате). И что еще лучше, для каждого раздела дат я вижу один большой файл паркета.

 /outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet

Однако, когда я запрашиваю файл, я вижу содержимое не по порядку. Точнее говоря, «не в порядке», похоже, что несколько упорядоченных разделов фрейма данных были объединены в файл.

Метаданные группы строк паркета показывают, что отсортированные поля фактически перекрываются (например, конкретный идентификатор может находиться во многих группах строк):

id:             :[min: 54, max: 65012, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 827, max: 65470, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 1629, max: 61412, num_nulls: 0]

Я хочу, чтобы данные были правильно упорядочены внутри каждого файла, чтобы метаданные мин / макс в каждой группе строк не перекрывались.

Например, это шаблон, который я хочу увидеть:

RG 0: id:             :[min: 54, max: 100, num_nulls: 0]
RG 1: id:             :[min: 100, max: 200, num_nulls: 0]

... где RG = "группа строк". Если бы я хотел id = 75, запрос мог бы найти его в одной группе строк.

Я пробовал много вариантов приведенного выше кода. Например, с * 1023 и без него (я знаю, что объединение - это плохо, но моя идея состояла в том, чтобы использовать его для предотвращения перемешивания). Я также попробовал sort вместо sortWithinPartitions (сортировка должна создать общую упорядоченную сортировку, но приведет к множеству разделов). Например:

val sorted = transformed.repartition($"date").sort("id", "sampleTime") 
sorted.write.partitionBy("date").parquet(s"/outputFiles")

Дает мне 200 файлов, что слишком много, и они все еще не отсортированы правильно. Я могу уменьшить количество файлов путем настройки размера в случайном порядке, но я ожидал, что сортировка будет обрабатываться по порядку во время записи (у меня сложилось впечатление, что запись не перемешивает ввод). Порядок, который я вижу, выглядит следующим образом (другие поля для краткости опущены):

+----------+----------------+
|id|      sampleTime|
+----------+----------------+
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|

Что выглядит как чередующиеся отсортированные разделы. Поэтому я думаю, что repartition здесь ничего не покупает, а sort, похоже, не в состоянии сохранить порядок на шаге записи.

Я читал, что то, что я хочу сделать, должно быть возможным. Я даже попробовал подход, изложенный в презентации «Настройка производительности паркета: «Отсутствующее руководство» Райана Блю (к сожалению, оно находится за платной платформой OReily). Это предполагает использование insertInto. В этом случае, похоже, спарк использовал старую версию parquet-mr, которая повредила метаданные, и я не уверен, как обновить его.

Я не уверен, что я делаю неправильно. У меня такое ощущение, что я неправильно понимаю, как repartition($"date") и sort работают и / или взаимодействуют.

Буду признателен за любые идеи. Извиняюсь за сочинение. :)

редактирование: Также обратите внимание, что если я сделаю show (n) на transformed.sort("id", "sampleTime"), данные будут отсортированы правильно. Таким образом, похоже, что проблема возникает на этапе записи. Как отмечалось выше, кажется, что вывод сортировки перемешивается во время записи.

1 Ответ

0 голосов
/ 04 сентября 2018

Просто идея, сортировка после объединения: ".coalesce (1) .sortWithinPartitions ()". Также ожидаемый результат выглядит странно - зачем заказывать данные в паркете? Сортировка после прочтения выглядит более уместно.

...