Я пытаюсь получить инкрементные данные из базы данных Greenplum с помощью Spark. У нас есть инкрементные данные для каждой таблицы с ключом transactionId
.
Каждый transactionId
может содержать данные одной или нескольких строк. Все они хранятся в таблице метаданных: incKeyTable
.
У нас также есть последний перемещенный transactionID
каждой таблицы в другой таблице метаданных: incKeyLoads
. Эта таблица содержит одну запись на таблицу, которая является последней обновленной transactionId
в рабочей таблице.
Для того, чтобы узнать приращение transactionid
для каждой таблицы, я придумал следующую логику.
val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap
Теперь у меня есть последний обновленный идентификатор транзакции на карте scala, как показано ниже:
lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)
Чтобы узнать, какие последние transactionId
(инкрементные данные) поступают в Greenplum, я написал следующую логику для запроса таблицы метаданных: incKeyTable
Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
val returnMsg = "Full loads on this table"
val count = incTransIds.where($"load_type" === "FULLLOAD").count
if(count == 0) {
incTransIds.createOrReplaceTempView("incTransID")
val execQuery = s"SELECT transactionId from incTransID order by transactionId desc"
val incLogIdDf = spark.sql(execQuery)
incLogIdDf.show
val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
pushLogIds
} else {
println("Full load count is greater than zero..")
returnMsg
}
}
var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
val tablename = keyTable.split("\\.") // Tablename = schema.tablename
val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
}
)
Этот метод работает, но он занимает так много времени, что я могу вытащить целые данные из greenplum на уровне таблицы, прежде чем этот поиск завершится, так как массив данных cdf огромен. Я пытался кэшировать dataframe: cdf, но он содержит почти 5 миллионов строк, и мне посоветовали не кэшировать такую большую таблицу для кэширования.
Я не мог придумать другого пути, где я мог бы сделать этот поиск быстрее. Может кто-нибудь сообщить мне идею сделать этот процесс эффективным?