У меня есть задание Spark, которое внутренне объединяет большую таблицу Hive (5 млрд строк, 400 МБ x 1000 разделов, сжатый паркет) с таблицей гораздо меньшего размера, которая может содержать менее нескольких сотен строк, а в некоторые / большинство недель можетбыть пустым.
Данные в большой таблице не разбиты на разделы и не разбиты на блоки ключом объединения, и в любом случае ключ объединения очень сильно перекошен, так что попытка соединения без широковещательной передачи приводит к тому, что некоторые исполнители превышают пределы памяти.
К счастью, меньший размер таблицы всегда будет намного ниже порога широковещания, поэтому с помощью broadcast(rhs)
я могу избежать перетасовки большого набора данных с помощью перекошенного ключа.
Теперь, когда RHS пустSpark, похоже, выполняет большую часть работы, когда кажется достаточно очевидным, что результатом будет пустой набор данных.
Я могу только предположить, что Spark не проверяет наличие пустых наборов данных перед (внутренним) объединением, поскольку проверка может бытьдорого, но был бы признателен за окончательный ответ.
В моем случае я знаю, что RHS будет смаТак что вызов rhs.rdd.count будет дешев, и я могу пропустить объединение, если в этом нет необходимости.
Мне пришлось опустить код, чувствительный к бизнесу, но основной алгоритм:
// Note small and large tables are cached for later re-use
smallTable
// Complex DAG
// write to hive
.cache
largeTable
// read from hive
.cache
largeTable.as("l")
.join(broadcast(smallTable.as("r")), $"l.key" === $"r.key", "inner")
.select($"l.*")
.as[LargeTable]
.mapPartitions(mapPartitionsFunction)
Спасибодля любого понимания.
Терри.