У меня есть Spark-приложение, которое объединяет 11 таблиц.В основном, для денормализации таблицы фактов, объединяя все таблицы измерений.Присоединение происходит на Spark.Все таблицы находятся в TiDB.Задание подключается с использованием соединения jdbc
В настоящее время пакет составляет всего 15 минут, число строк в таблице составляет от 10 000 до 15 000.Есть ли какие-либо параметры настройки для соединений.Любой код, который можно оптимизировать.Есть лучший способ сделать это?
Фрагмент кода
val factTable = sparkSession.sql("select col1,col2,col3... from fact_table where last_modified_time between lowerBound and higerbound")
//Get only the rows required from a dimension tables by generating a where clause
//This generates dim1_id=122 OR dim1_id=123 OR dim1_id=124 OR ...
val dim1TableFilter = factTable.map(fact => s"dim1_id = ${fact.dim1_id}").dropDuplicates().reduce(_+" OR "+_)
val dim1Table = sparkSession.sql(s"select col1,col2,col3.... from dim1Table where ${dim1TableFilter}")
val dim2TableFilter = factTable.map(fact => s"dim2_id = ${fact.dim2_id}").dropDuplicates().reduce(_+" OR "+_)
val dim2Table = sparkSession.sql(s"select col1,col2,col3.... from dim2Table where ${dim2TableFilter}")
val dim3TableFilter = factTable.map(fact => s"dim3_id = ${fact.dim3_id}").dropDuplicates().reduce(_+" OR "+_)
val dim3Table = sparkSession.sql(s"select col1,col2,col3.... from dim3Table where ${dim3TableFilter}")
...
....
...... so on
// Finally join fact tables with dimension tables
val denormalisedTable = factTable.join(dim1Table,Seq("dim1_id"))
.join(dim2Table,Seq("dim2_id"))
.join(dim3Table,Seq("dim3_id"))
.join(dim4Table,Seq("di4_id"))
.join(dim5Table,Seq("dim5_id"))
.join(dim6Table,Seq("dim6_id"))
.join(dim7Table,Seq("dim7_id"))
.join(dim8Table,Seq("dim8_id"))
.join(dim9Table,Seq("dim9_id"))
.join(dim10Table,Seq("dim10_id"))
.join(dim11Table,Seq("dim11_id"))
// Push the batch to Kafka
denormalisedTable
.select(to_json(keyColumns).as("key"), to_json(struct(col1,col2,col3...)).as("value"), current_timestamp().as("timestamp"))
.selectExpr("CAST(key as STRING)", "CAST(value as STRING)", "CAST(timestamp as LONG)")
.write
.format("kafka")
.options(PropertiesParser.getKafkaConf())
.option("topic", topicName)
.save()