Низкая производительность при записи фрейма данных в базу данных Azure SQL с использованием PySpark JDBC - PullRequest
0 голосов
/ 13 января 2020

Я использую ниже JDBC URL в PySpark для записи data frame в Azure SQL Database. Однако я чувствую, что производительность этой операции записи не на должном уровне и может быть улучшена путем установки нескольких дополнительных свойств. Есть ли какие-либо обходные пути или какие-либо параметры, которые я могу добавить, чтобы улучшить производительность записи JDB C?

jdbcUrl = "jdbc:sqlserver://server.database.windows.net:1433;databaseName=test;enablePrepareOnFirstPreparedStatementCall=false"

Ниже приведен фактический оператор записи фрейма данных.

  data_frame.write \
            .mode('overwrite') \
            .format('jdbc') \
            .option('driver', jdbc_driver) \
            .option('user', user) \
            .option('password', password) \
            .option('url', jdbcUrl) \
            .option('dbtable', table + '_STG') \
            .save()

1 Ответ

0 голосов
/ 14 января 2020

Вы можете попробовать использовать Spark для SQL коннектора DB для записи данных в SQL базу данных с использованием массовой вставки в Scala, см. Раздел Write data to Azure SQL database or SQL Server using Bulk Insert из Azure официальное документ Accelerate real-time big data analytics with Spark connector for Azure SQL Database and SQL Server, как показано на скриншоте ниже.

enter image description here

Поэтому я думаю, что проблема для вас сейчас заключается в том, как передать фрейм данных PySpark data_frame в Python к коду в Scala. Вы можете использовать функцию registerTempTable кадра данных с именем таблицы, например temp_table, в качестве кода и рисунка ниже в блоке данных python блокнот.

# register a temp table for a dataframe in Python
data_frame.registerTempTable("temp_table")

%scala
val scalaDF = table("temp_table")

enter image description here

Затем запустить коды массовой вставки в Scala после %scala

%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/** 
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

scalaDF.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
...