Оптимизируйте несколько запросов JDBC с помощью Spark - PullRequest
1 голос
/ 17 марта 2019

Я пытаюсь получить инкрементные данные из базы данных Greenplum с помощью Spark. У нас есть инкрементные данные для каждой таблицы с ключом transactionId. Каждый transactionId может содержать данные одной или нескольких строк. Все они хранятся в таблице метаданных: incKeyTable. У нас также есть последний перемещенный transactionID каждой таблицы в другой таблице метаданных: incKeyLoads. Эта таблица содержит одну запись на таблицу, которая является последней обновленной transactionId в рабочей таблице. Для того, чтобы узнать приращение transactionid для каждой таблицы, я придумал следующую логику.

val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap   = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap

Теперь у меня есть последний обновленный идентификатор транзакции на карте scala, как показано ниже:

lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)

Чтобы узнать, какие последние transactionId (инкрементные данные) поступают в Greenplum, я написал следующую логику для запроса таблицы метаданных: incKeyTable

Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
    val returnMsg = "Full loads on this table"
    val count = incTransIds.where($"load_type" === "FULLLOAD").count
    if(count == 0) {
      incTransIds.createOrReplaceTempView("incTransID")
      val execQuery  = s"SELECT transactionId from incTransID order by transactionId desc"
      val incLogIdDf = spark.sql(execQuery)
      incLogIdDf.show
      val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
      pushLogIds
    } else {
      println("Full load count is greater than zero..")
      returnMsg
    }
}

var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
    val tablename = keyTable.split("\\.")   // Tablename = schema.tablename
    val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
    incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
    }
)

Этот метод работает, но он занимает так много времени, что я могу вытащить целые данные из greenplum на уровне таблицы, прежде чем этот поиск завершится, так как массив данных cdf огромен. Я пытался кэшировать dataframe: cdf, но он содержит почти 5 миллионов строк, и мне посоветовали не кэшировать такую ​​большую таблицу для кэширования. Я не мог придумать другого пути, где я мог бы сделать этот поиск быстрее. Может кто-нибудь сообщить мне идею сделать этот процесс эффективным?

1 Ответ

1 голос
/ 17 марта 2019

Код в вопросе не может быть тем, что вы на самом деле используете, потому что вы возвращаете pushLogIds в sortLogIds, который никогда не определяется, и вы выбираете из schema.tablename вместо s"schema.$tablename". Это затрудняет точно знать, что происходит ...

Тем не менее, с точки зрения структуры обработки больших данных, у вашего подхода есть несколько потенциальных проблем:

  1. Итерации вместо преобразований UNION. При прочих равных условиях вместо того, чтобы выдавать много отдельных запросов и затем собирать результаты в драйвере, предпочтительно подумать о способах выдачи одного запроса. , Вот как оптимизатор может помочь. В вашем случае рассмотрите возможность создания представления Greenplum, объединяющего все таблицы в lutransIdTableMap.

  2. Действия вместо преобразований объединения. В sortLogIds вы выполняете действие count только для того, чтобы решить, следует ли выполнять дополнительные запросы. При прочих равных условиях лучше выразить это через преобразование объединения, чтобы отложить запуск действия. Позже вы выпускаете show, что под обложками эквивалентно take(n). Это действие действительно необходимо? Позже вы используете collect для генерации выражения SQL для использования в операторе IN. Это еще один пример, где вы должны использовать вместо этого соединение. В общем, вы выполняете один и тот же базовый запрос Greenplum, представленный incTransId три раза. Если вы настаиваете на этом типе обработки, вам следует каким-то образом упорствовать incTransId.

  3. Сборка SQL вместо DSL используется. Как правило, если вы используете Spark через язык программирования, а не через SparkSQL, вы должны использовать DSL вместо сборки выражений SQL в виде строк. , Таким образом, вам не нужно переопределять представления и т. Д.

Здесь слишком много всего, что нужно исправить, не имея полного кода и не зная точной схемы Greenplum + стратегии распространения + индексов (если таковые имеются) и размеров данных. Однако вышесказанное должно дать вам отправную точку.

Вот пример того, как перейти от использования итерации к объединениям.

val allData = Map("table1" -> 101, "table2" -> 212)
  .map { case (tableName, id) =>
    spark.table(tableName).withColumn("id", lit(id))
  }
  .reduceLeft(_ union _)

Вот пример того, как использовать объединения вместо collect + IN.

val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action

В приведенном выше примере также показано, как вы можете использовать наборы данных с collect, заменив, например, .collect().map(_.getInt(0).toString) на .as[String].collect, что проще, безопаснее и быстрее.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...