Предварительная группировка таблиц в HDFS и чтение в Spark с нулевым перемешиванием - PullRequest
0 голосов
/ 24 августа 2018

Контекст

У меня есть две таблицы, к которым я присоединяюсь / объединяюсь как часть моих искровых заданий, что приводит к большому перемешиванию при каждом запуске задания.Я хочу амортизировать затраты по всем заданиям, сохраняя однотипные данные, и использую уже сгруппированные данные как часть моих регулярных прогонов Spark, чтобы избежать случайного перемешивания.

Чтобы попытаться добиться этого, у меня есть некоторые данные вHDFS хранится в формате паркета.Я использую повторяющиеся поля Parquet для получения следующей схемы

(date, [aRecords], [bRecords])

Где [aRecords] указывает массив aRecord.Я также делю данные по дате в HDFS, используя обычные write.partitionBy($"date").

В этой ситуации aRecords и bRecords, по-видимому, эффективно сгруппированы по дате.Я могу выполнять операции, подобные следующим:

case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
    .flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))

и получать результаты, полученные от использования эквивалентных операций groupByKey над двумя отдельными таблицами для aRecords и bRecords, заданных по дате.

Разницамежду этими двумя случаями я избегаю перестановки с уже сгруппированными данными, стоимость сгруппированных амортизируется путем сохранения в HDFS.

Вопрос

Теперь к вопросу.Из набора сгруппированных данных я хотел бы получить два сгруппированных набора данных, чтобы я мог использовать стандартные SQL-операторы Spark (такие как cogroup, join и т. Д.) без перестановок .Это кажется возможным, поскольку первый пример кода работает, но Spark по-прежнему настаивает на хэшировании / перетасовке данных, когда я присоединяюсь к / groupByKey / cogroup и т. Д.

Взять приведенный ниже пример кода.Я ожидаю, что есть способ, которым мы можем выполнить нижеприведенное без перестановок при выполнении объединения.

val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]

val aRecords = cogroupedData
    .flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
    .flatMap(cog => cog.bRecords.map(b => (cog.date,b)))

val joined = aRecords.join(bRecords,Seq("date"))

Если посмотреть на литературу, если cogroupedData имеет известный разделитель, то последующие операции не должнынесут случайность, так как они могут использовать тот факт, что СДР уже разделен и сохранить разделитель.

Мне кажется, что для этого мне нужно получить набор данных / rdd cogroupedData с известным разделителем без необходимости перемешивания.

Другие вещи, которые я уже пробовал:

  • Метаданные куста - отлично работает для простых объединений, но оптимизирует только начальное объединение, а не последующие преобразования.Hive также вообще не помогает cogroups

У кого-нибудь есть идеи?

1 Ответ

0 голосов
/ 24 августа 2018

Вы допустили две ошибки:

Правильный способ сделать это:

  • Использовать разбивку на блоки.

    val n: Int
    someDF.write.bucketBy(n, "date").saveAsTable("df")
    
  • Отбросить функциональный API в пользу SQL API:

    import org.apache.spark.sql.functions.explode
    
    val df = spark.table("df")
    
    val adf = df.select($"date", explode($"aRecords").alias("aRecords"))
    val bdf = df.select($"date", explode($"bRecords").alias("bRecords"))
    
    adf.join(bdf, Seq("date"))
    
...