Оптимизация работы Spark: есть ли способ настроить работу Spark, в которой слишком много соединений - PullRequest
0 голосов
/ 12 октября 2018

У меня есть Spark-приложение, которое объединяет 11 таблиц.В основном, для денормализации таблицы фактов, объединяя все таблицы измерений.Присоединение происходит на Spark.Все таблицы находятся в TiDB.Задание подключается с использованием соединения jdbc

В настоящее время пакет составляет всего 15 минут, число строк в таблице составляет от 10 000 до 15 000.Есть ли какие-либо параметры настройки для соединений.Любой код, который можно оптимизировать.Есть лучший способ сделать это?

Фрагмент кода

val factTable = sparkSession.sql("select col1,col2,col3... from fact_table where last_modified_time between lowerBound and higerbound") 

//Get only the rows required from a dimension tables by generating a where clause
//This generates dim1_id=122 OR dim1_id=123 OR dim1_id=124 OR ...

val dim1TableFilter = factTable.map(fact => s"dim1_id = ${fact.dim1_id}").dropDuplicates().reduce(_+" OR "+_)

val dim1Table = sparkSession.sql(s"select col1,col2,col3.... from dim1Table where ${dim1TableFilter}")

val dim2TableFilter = factTable.map(fact => s"dim2_id = ${fact.dim2_id}").dropDuplicates().reduce(_+" OR "+_)
val dim2Table = sparkSession.sql(s"select col1,col2,col3.... from dim2Table where ${dim2TableFilter}")

val dim3TableFilter = factTable.map(fact => s"dim3_id = ${fact.dim3_id}").dropDuplicates().reduce(_+" OR "+_)
val dim3Table = sparkSession.sql(s"select col1,col2,col3.... from dim3Table where ${dim3TableFilter}")
...
....
...... so on 

// Finally join fact tables with dimension tables

val denormalisedTable = factTable.join(dim1Table,Seq("dim1_id"))
    .join(dim2Table,Seq("dim2_id"))
    .join(dim3Table,Seq("dim3_id"))
    .join(dim4Table,Seq("di4_id"))
    .join(dim5Table,Seq("dim5_id"))
    .join(dim6Table,Seq("dim6_id"))
    .join(dim7Table,Seq("dim7_id"))
    .join(dim8Table,Seq("dim8_id"))
    .join(dim9Table,Seq("dim9_id"))
    .join(dim10Table,Seq("dim10_id"))
    .join(dim11Table,Seq("dim11_id"))

// Push the batch to Kafka
 denormalisedTable
      .select(to_json(keyColumns).as("key"), to_json(struct(col1,col2,col3...)).as("value"), current_timestamp().as("timestamp"))
      .selectExpr("CAST(key as STRING)", "CAST(value as STRING)", "CAST(timestamp as LONG)")
      .write
      .format("kafka")
      .options(PropertiesParser.getKafkaConf())
      .option("topic", topicName)
      .save()

1 Ответ

0 голосов
/ 12 октября 2018

Одной из вещей, которую вы можете оценить, является попытка использовать соединение на стороне карты.Объединения на стороне карты могут быть очень хороши для объединений между большой таблицей (факт) с относительно небольшими таблицами (измерениями), которые затем можно использовать для выполнения соединения по схеме «звезда».По сути, это позволяет избежать отправки больших таблиц по сети для обычных хеш-соединений!

См .: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...