Перекошенная функция окна и исходные разделы улья? - PullRequest
1 голос
/ 24 июня 2019

Данные, которые я читаю через Spark, являются сильно искаженными Hive Table со следующей статистикой.

(MIN, 25th, MEDIAN, 75TH, MAX) через интерфейс Spark:

1506.0 B / 0 232.4 KB / 27288 247.3 KB / 29025 371.0 KB / 42669 269.0 MB / 27197137

Я считаю, что это вызывает проблемы в работе, когда я выполняю Window Funcs и Pivots.

Я попытался использовать этот параметр, чтобы ограничить размер раздела, но ничего не изменилось, и разделы все еще перекошены при чтении.

spark.conf.set("spark.sql.files.maxPartitionBytes")

Кроме того, когда я кеширую этот DF с таблицей Hive в качестве источника, это занимает несколько минут и даже вызывает некоторый сборщик мусора в интерфейсе Spark, скорее всего, из-за перекоса.

Это spark.sql.files.maxPartitionBytes работает с таблицами Hive или только с файлами?

Как лучше всего работать с этим искаженным источником Hive?

Подойдет ли для этой задачи что-то вроде сценического барьера, написанного на паркете или посола?

Я бы хотел избежать .repartition() при чтении, поскольку это добавляет еще один слой к уже загруженным данным американских горок.

Спасибо

=============================================== ===

После дальнейших исследований выясняется, что Window Function также вызывает искажение данных, и именно здесь Spark Job зависает.

Я выполняю time series заполнение с помощью двойной Window Function (вперед, затем назад, чтобы вменять все показания датчика null), и я пытаюсь следовать этой статье, чтобы попробовать salt метод для равномерного распределения .. Однако следующий код выдает все значения null, поэтому метод salt не работает.

Не уверен, почему я получаю skews после Window, поскольку каждый элемент измерения, по которому я делю раздел, имеет примерно одинаковое количество записей после проверки с помощью .groupBy() ... таким образом, зачем нужен salt?

+--------------------+-------+
|          measure   |  count|
+--------------------+-------+
|    v1              |5030265|
|      v2            |5009780|
|     v3             |5030526|
| v4                 |5030504|
...

Солонка => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))

# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")

window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(0,Window.unboundedFollowing)

# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")

1 Ответ

2 голосов
/ 24 июня 2019

Решение на основе улья:

Вы можете включить оптимизацию Skew join, используя конфигурацию улья. Применимые настройки:

set hive.optimize.skewjoin=true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

См. Советы для этого:

перекос подсказки может работать в этом случае

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...