Вы можете попробовать использовать Spark для SQL коннектора DB для записи данных в SQL базу данных с использованием массовой вставки в Scala, см. Раздел Write data to Azure SQL database or SQL Server using Bulk Insert
из Azure официальное документ Accelerate real-time big data analytics with Spark connector for Azure SQL Database and SQL Server
, как показано на скриншоте ниже.
Поэтому я думаю, что проблема для вас сейчас заключается в том, как передать фрейм данных PySpark data_frame
в Python к коду в Scala. Вы можете использовать функцию registerTempTable
кадра данных с именем таблицы, например temp_table
, в качестве кода и рисунка ниже в блоке данных python блокнот.
# register a temp table for a dataframe in Python
data_frame.registerTempTable("temp_table")
%scala
val scalaDF = table("temp_table")
Затем запустить коды массовой вставки в Scala после %scala
%scala
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
/**
Add column Metadata.
If not specified, metadata is automatically added
from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
scalaDF.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)