Результаты страницы Apache Spark или просмотреть результаты для больших наборов данных - PullRequest
0 голосов
/ 24 апреля 2018

Я использую Hive с Spark 1.6.3

У меня большой набор данных (40000 строк, 20 столбцов или около того, и каждый столбец содержит, возможно, 500 байт - 3 КБ данных)

запрос представляет собой соединение с 3 наборами данных

Я хочу, чтобы я мог отобразить окончательный набор данных объединения, и я обнаружил, что могу использовать row_number() OVER (ORDER BY 1) для генерации уникального номера строки для каждой строки в наборе данных.

После этого я могу сделать

SELECT * FROM dataset WHERE row between 1 AND 100

Однако есть ресурсы, которые советуют не использовать ORDER BY, поскольку он помещает все данные в 1 раздел (я вижу, что это такжурналы, в которых при распределении в случайном порядке данные перемещаются в один раздел), когда это происходит, я получаю исключения из памяти.

Как мне более эффективно выполнять разбиение по страницам набора данных?

Я включил постоянство - MEMORY_AND_DISK, чтобы, если раздел слишком большой, он попадал на диск (и для некоторых преобразований я вижу, что, по крайней мере, некоторые данные выливаются на диск, когда я не использую *)1019 *)

Ответы [ 2 ]

0 голосов
/ 24 апреля 2018

Одной стратегией может быть сначала выбор только уникального ключа набора данных и применение функции row_number только к этому набору данных. Поскольку вы выбираете один столбец из большого набора данных, вероятность того, что он поместится в одном разделе, выше.

val dfKey = df.select("uniqueKey")
dfKey.createOrUpdateTempTable("dfKey")
val dfWithRowNum = spark.sql(select dfKey*, row_number() as row_number OVER (ORDER BY 1))
// save dfWithRowNum

После завершения операции row_number для uniqueKey; сохранить этот фрейм данных Теперь на следующем этапе присоедините этот фрейм данных к большему фрейму данных и добавьте к нему столбец row_number.

dfOriginal.createOrUpdateTempTable("dfOriginal")
dfWithRowNum.createOrUpdateTempTable("dfWithRowNum")
val joined = spark.sql("select dfOriginal.* from dfOriginal join dfWithRowNum on dfOriginal.uniqueKey = dfWithRowNum.uniqueKey")
// save joined

Теперь вы можете запросить

SELECT * FROM joineddataset WHERE row between 1 AND 100

Для сохранения с MEMORY_DISK я обнаружил, что иногда происходит сбой при недостатке памяти. Я предпочел бы использовать DISK_ONLY, где производительность ухудшается, хотя выполнение гарантировано.

0 голосов
/ 24 апреля 2018

Ну, вы можете применить этот метод к вашему окончательному фрейму данных соединения.

Вы должны также сохранить фрейм данных в виде файла, чтобы гарантировать порядок, так как повторная оценка может создать разный заказ .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...