Я пытаюсь выполнить массовую вставку в таблицу SQL Server из записной книжки, используя метод, подобный следующему:
Массовое копирование в базу данных SQL Azure или SQL Server
Это работает нормально, пока я не попытаюсь записать в столбец тип данных datetime. Таблица, в которую я пытаюсь записать, имеет следующую схему:
create table raw.HubDrg_TEST
(
DrgKey varchar(64) not null,
LoadDate datetime,
LoadProcess varchar(255),
RecordSource varchar(255),
DrgCode varchar(255)
)
Мой код Scala выглядит следующим образом:
//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load()
//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", sqlDwUrlSmall)
.option("tempDir", tempDir)
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "Select DrgKey as ExistingDrgKey from raw.HubDrg_TEST")
.load()
val sha_256 = udf((s: String) => { String.format("%032x", new BigInteger(1, MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")))) })
//Add additional columns
stagedData = stagedData.withColumn("DrgKey",sha_256(col("DrgCode"))).withColumn("LoadProcess",lit("TestLoadProcess"))
.withColumn("RecordSource",lit("TestRecordSource"))
//Join and filter out existing hub records
val dff = stagedData.join(existingHub, col("DrgKey")===col("ExistingDrgKey"), "left_outer").filter(existingHub.col("ExistingDrgKey").isNull).drop("ExistingDrgKey")
//Bulk insert
val bulkCopyConfig = Config(Map(
"url" -> dwServer,
"databaseName" -> dwDatabase,
"user" -> dwUser,
"password" -> dwPass,
"dbTable" -> "raw.HubDrg_TEST",
"bulkCopyBatchSize" -> "2000",
"bulkCopyTableLock" -> "false",
"bulkCopyTimeout" -> "0"
))
dff.bulkCopyToSqlDB(bulkCopyConfig)
Проблема, с которой я сталкиваюсь, заключается в том, что значение даты и времени, которое я выбираю как getdate() as LoadDate
, выдает мне эту ошибку при попытке вставить в вышеупомянутую таблицу: SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 2. | Error calling: pConn->Done() | state: FFFF, number: 58673, active connections: 9', Connection String: Driver={pdwodbc17e};app=TypeC01-DmsNativeWriter:DB66\mpdwsvc (13056)-ODBC;trusted_connection=yes;autotranslate=no;server=\\.\pipe\DB.66-a313018f1e5b\sql\query;database=Distribution_15
Даже при попытке не использовать значение datetime из запроса SQL Server и изменении значения LoadDate на: withColumn("LoadDate",current_timestamp())
, при попытке использовать встроенную функцию current_timestamp в spark, она все равно не работает .
Я видел эту статью stackoverflow, которая похожа на вопрос, но все еще не ответила на вопрос. У кого-нибудь есть хороший пример того, как вставить в таблицу SQL Server с типом данных datetime с использованием библиотеки com.microsoft.azure.sqldb.spark.bulkcopy._
?
Вот пример данных из dff.show()
+-------+--------------------+--------------------+---------------+----------------+
|DrgCode| LoadDate| DrgKey| LoadProcess| RecordSource|
+-------+--------------------+--------------------+---------------+----------------+
| 390|2019-07-02 09:05:...|48a1a756f2d83f1dc...|TestLoadProcess|TestRecordSource|
| 18|2019-07-02 09:05:...|4ec9599fc203d176a...|TestLoadProcess|TestRecordSource|
| 481|2019-07-02 09:05:...|51d089cdaf0c968c9...|TestLoadProcess|TestRecordSource|
| 460|2019-07-02 09:05:...|841a05fd378a2c067...|TestLoadProcess|TestRecordSource|
| 838|2019-07-02 09:05:...|cef5838d118dccd9d...|TestLoadProcess|TestRecordSource|
| 61|2019-07-02 09:05:...|d029fa3a95e174a19...|TestLoadProcess|TestRecordSource|
| 807|2019-07-02 09:05:...|fce86e339dc3131c4...|TestLoadProcess|TestRecordSource|
| 44|2019-07-02 09:05:...|71ee45a3c0db9a986...|TestLoadProcess|TestRecordSource|
| 267|2019-07-02 09:05:...|8acc23987b8960d83...|TestLoadProcess|TestRecordSource|
| 222|2019-07-02 09:05:...|9b871512327c09ce9...|TestLoadProcess|TestRecordSource|
| 934|2019-07-02 09:05:...|a8443b1426652157e...|TestLoadProcess|TestRecordSource|
| 677|2019-07-02 09:05:...|2782526eaa0c5c254...|TestLoadProcess|TestRecordSource|
| 701|2019-07-02 09:05:...|290a0b92873bdf4e4...|TestLoadProcess|TestRecordSource|
| 441|2019-07-02 09:05:...|2dfe70c43208f52b9...|TestLoadProcess|TestRecordSource|
| 439|2019-07-02 09:05:...|50a010ce24d089605...|TestLoadProcess|TestRecordSource|
| 883|2019-07-02 09:05:...|3055e0d8130c7a197...|TestLoadProcess|TestRecordSource|
| 947|2019-07-02 09:05:...|4d0198f4905a08812...|TestLoadProcess|TestRecordSource|
| 369|2019-07-02 09:05:...|5f193b350c8aba488...|TestLoadProcess|TestRecordSource|
| 21|2019-07-02 09:05:...|6f4b6612125fb3a0d...|TestLoadProcess|TestRecordSource|
| 503|2019-07-02 09:05:...|7182dd431b5c8833e...|TestLoadProcess|TestRecordSource|
+-------+--------------------+--------------------+---------------+----------------+
only showing top 20 rows
dff:org.apache.spark.sql.DataFrame
DrgCode:string
LoadDate:timestamp
DrgKey:string
LoadProcess:string
RecordSource:string