Контекст
У меня есть две таблицы, к которым я присоединяюсь / объединяюсь как часть моих искровых заданий, что приводит к большому перемешиванию при каждом запуске задания.Я хочу амортизировать затраты по всем заданиям, сохраняя однотипные данные, и использую уже сгруппированные данные как часть моих регулярных прогонов Spark, чтобы избежать случайного перемешивания.
Чтобы попытаться добиться этого, у меня есть некоторые данные вHDFS хранится в формате паркета.Я использую повторяющиеся поля Parquet для получения следующей схемы
(date, [aRecords], [bRecords])
Где [aRecords] указывает массив aRecord.Я также делю данные по дате в HDFS, используя обычные write.partitionBy($"date")
.
В этой ситуации aRecords и bRecords, по-видимому, эффективно сгруппированы по дате.Я могу выполнять операции, подобные следующим:
case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
.flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))
и получать результаты, полученные от использования эквивалентных операций groupByKey над двумя отдельными таблицами для aRecords и bRecords, заданных по дате.
Разницамежду этими двумя случаями я избегаю перестановки с уже сгруппированными данными, стоимость сгруппированных амортизируется путем сохранения в HDFS.
Вопрос
Теперь к вопросу.Из набора сгруппированных данных я хотел бы получить два сгруппированных набора данных, чтобы я мог использовать стандартные SQL-операторы Spark (такие как cogroup, join и т. Д.) без перестановок .Это кажется возможным, поскольку первый пример кода работает, но Spark по-прежнему настаивает на хэшировании / перетасовке данных, когда я присоединяюсь к / groupByKey / cogroup и т. Д.
Взять приведенный ниже пример кода.Я ожидаю, что есть способ, которым мы можем выполнить нижеприведенное без перестановок при выполнении объединения.
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
val aRecords = cogroupedData
.flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
.flatMap(cog => cog.bRecords.map(b => (cog.date,b)))
val joined = aRecords.join(bRecords,Seq("date"))
Если посмотреть на литературу, если cogroupedData имеет известный разделитель, то последующие операции не должнынесут случайность, так как они могут использовать тот факт, что СДР уже разделен и сохранить разделитель.
Мне кажется, что для этого мне нужно получить набор данных / rdd cogroupedData с известным разделителем без необходимости перемешивания.
Другие вещи, которые я уже пробовал:
- Метаданные куста - отлично работает для простых объединений, но оптимизирует только начальное объединение, а не последующие преобразования.Hive также вообще не помогает cogroups
У кого-нибудь есть идеи?