Spark Enhance Join между терабайтами наборов данных - PullRequest
1 голос
/ 19 мая 2019

У меня есть пять Hive таблиц, предположим, что имена A, B, C, D и E. Для каждой таблицы есть ключ customer_id в качестве ключа для соединения между ними. Кроме того, каждая таблица содержит не менее 100: 600 столбцов, все они имеют формат Parquet.

Пример одной таблицы ниже:

CREATE TABLE table_a 
(
customer_id Long, 
col_1 STRING,
col_2 STRING,
col_3 STRING,
.
.
col_600 STRING
)
STORED AS PARQUET;

Мне нужно набрать два очка,

  • Соедините их вместе наиболее оптимальным способом, используя Spark Scala. Я пытался sortByKey до присоединения, но все еще существует узкое место в производительности. Я пытался набрать reparation по ключу перед соединением, но производительность все еще не была хорошей. Я пытался увеличить параллелизм для Spark, чтобы сделать его 6000 со многими исполнителями, но не смог достичь хороших результатов.
  • После объединения мне нужно применить отдельную функцию для некоторых из этих столбцов.

Пример объединения, которое я пробовал ниже,

val dsA =  spark.table(table_a)
val dsB =  spark.table(table_b) 
val dsC =  spark.table(table_c) 
val dsD =  spark.table(table_d) 
val dsE =  spark.table(table_e) 
val dsAJoineddsB = dsA.join(dsB,Seq(customer_id),"inner")

1 Ответ

0 голосов
/ 19 мая 2019

Я думаю, что в этом случае прямое соединение не является оптимальным случаем.Вы можете выполнить эту задачу следующим простым способом:

  • Во-первых, создайте класс дел, например FeatureData, с двумя полями case class FeatureData(customer_id:Long, featureValue:Map[String,String])
  • Во-вторых, вы сопоставите каждую таблицу сКлюч класса дела FeatureData, [feature_name, feature_value]
  • В-третьих, вы будете groupByKey и union все наборы данных с одним и тем же ключом.

Я выше, как это будетБыть быстрее объединяться, чем присоединяться.Но это требует дополнительной работы.

После этого у вас будет набор данных с ключом, картой.Вы примените преобразование для key, Map(feature_name).

Простой пример реализации выглядит следующим образом: сначала вы отобразите dataset на case class, а затем сможете объединить их все.После этого вы получите groupByKey, затем отобразите его и уменьшите.

case class FeatureMappedData(customer_id:Long, feature: Map[String, String])
val dsAMapped = dsA.map(row ⇒
        FeatureMappedData(row.customer_id,
          Map("featureA" -> row.featureA,
            "featureB" -> row.featureB)))
val unionDataSet = dsAMapped  union dsBMapped 
unionDataSet.groupByKey(_.customer_id)
      .mapGroups({
        case (eid, featureIter) ⇒ {
      val featuresMapped: Map[String, String] = featureIter.map(_.feature).reduce(_ ++ _).withDefaultValue("0") 
      FeatureMappedData(customer_id, featuresMapped)
    }
  })
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...