Мы использовали библиотеку azure-sqldb-spark вместо встроенной функции экспорта Spark по умолчанию.Эта библиотека предоставляет вам метод bulkCopyToSqlDB
, который является real пакетной вставкой и работает на намного быстрее.Это немного менее практично в использовании, чем встроенная функциональность, но по моему опыту оно все же стоит.
Мы используем его более или менее так:
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
val options = Map(
"url" -> "***",
"databaseName" -> "***",
"user" -> "***",
"password" -> "***",
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)
// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)
val bulkConfig = Config(options ++ Map(
"dbTable" -> "myTable",
"bulkCopyBatchSize" -> "20000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkConfig)
Как вы можетевидите, мы генерируем запрос CREATE TABLE
сами.Вы можете позволить библиотеке создать таблицу, но она просто сделает dataFrame.limit(0).write.sqlDB(config)
, который все еще может быть довольно неэффективным, вероятно, требует, чтобы вы кешировали DataFrame
, и это не позволяет вам выбиратьSaveMode
.
Также потенциально интересно: нам пришлось использовать ExclusionRule
при добавлении этой библиотеки в нашу сборку sbt, иначе задача assembly
не будет выполнена.
libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
ExclusionRule(organization = "org.apache.spark")
)