Spark: оптимизировать запись DataFrame в SQL Server - PullRequest
8 голосов
/ 16 апреля 2019

Я использую приведенный ниже код для записи DataFrame из 43 столбцов и около 2 000 000 строк в таблицу в SQL Server:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()

К сожалению, хотя он работает для небольших фреймов данных, он либо очень медленный, либо для тайм-аутов используется тайм-аут. Любые советы о том, как его оптимизировать?

Я пробовал установить rewriteBatchedStatements=true

Спасибо.

Ответы [ 3 ]

6 голосов
/ 25 апреля 2019

Мы использовали библиотеку azure-sqldb-spark вместо встроенной функции экспорта Spark по умолчанию.Эта библиотека предоставляет вам метод bulkCopyToSqlDB, который является real пакетной вставкой и работает на намного быстрее.Это немного менее практично в использовании, чем встроенная функциональность, но по моему опыту оно все же стоит.

Мы используем его более или менее так:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._

val options = Map(
  "url"          -> "***",
  "databaseName" -> "***",
  "user"         -> "***",
  "password"     -> "***",
  "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)

// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)

val bulkConfig = Config(options ++ Map(
  "dbTable"           -> "myTable",
  "bulkCopyBatchSize" -> "20000",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkConfig)

Как вы можетевидите, мы генерируем запрос CREATE TABLE сами.Вы можете позволить библиотеке создать таблицу, но она просто сделает dataFrame.limit(0).write.sqlDB(config), который все еще может быть довольно неэффективным, вероятно, требует, чтобы вы кешировали DataFrame, и это не позволяет вам выбиратьSaveMode.

Также потенциально интересно: нам пришлось использовать ExclusionRule при добавлении этой библиотеки в нашу сборку sbt, иначе задача assembly не будет выполнена.

libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
  ExclusionRule(organization = "org.apache.spark")
)
5 голосов
/ 17 апреля 2019

Попробуйте добавить опцию batchsize к вашему выражению по крайней мере > 10000 (измените это значение соответственно, чтобы повысить производительность) и повторите запись.

Из документов по искру:

Размер пакета JDBC, который определяет сколько строк для вставки в туда и обратно . Это может помочь производительности на драйверах JDBC. Этот вариант относится только к письму. По по умолчанию 1000 .

Также стоит проверить:

  • numPartitions option для увеличения параллелизма (это также определяет максимальное количество одновременных соединений JDBC)

  • queryTimeout option для увеличения времени ожидания для опции записи.

1 голос
/ 29 апреля 2019

конвертирование данных в файлы CSV и копирование этих файлов CSV является вариантом для вас? мы автоматизировали этот процесс для больших таблиц и перенесли их в GCP в формате CSV. вместо того, чтобы читать это через JDBC.

...