Запись в SQL Server тип данных datetime из Scala / Spark - PullRequest
2 голосов
/ 02 июля 2019

Я пытаюсь выполнить массовую вставку в таблицу SQL Server из записной книжки, используя метод, подобный следующему:

Массовое копирование в базу данных SQL Azure или SQL Server

Это работает нормально, пока я не попытаюсь записать в столбец тип данных datetime. Таблица, в которую я пытаюсь записать, имеет следующую схему:

create table raw.HubDrg_TEST
(
  DrgKey varchar(64) not null,
  LoadDate datetime,
  LoadProcess varchar(255),
  RecordSource varchar(255),
  DrgCode varchar(255)
 )

Мой код Scala выглядит следующим образом:

//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select distinct CodeID as DrgCode, getdate() as LoadDate from StageMeditech.livendb_dbo_DAbsDrgs").load() 

//Get dataset for data in existing Hub
val existingHub: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "Select DrgKey as ExistingDrgKey from raw.HubDrg_TEST")
  .load()

val sha_256 = udf((s: String) => { String.format("%032x", new BigInteger(1, MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")))) })

//Add additional columns
stagedData = stagedData.withColumn("DrgKey",sha_256(col("DrgCode"))).withColumn("LoadProcess",lit("TestLoadProcess"))
                                   .withColumn("RecordSource",lit("TestRecordSource"))
//Join and filter out existing hub records
val dff = stagedData.join(existingHub, col("DrgKey")===col("ExistingDrgKey"), "left_outer").filter(existingHub.col("ExistingDrgKey").isNull).drop("ExistingDrgKey") 

//Bulk insert
val bulkCopyConfig = Config(Map( 
"url" -> dwServer, 
"databaseName" -> dwDatabase, 
"user" -> dwUser, 
"password" -> dwPass, 
"dbTable" -> "raw.HubDrg_TEST", 
"bulkCopyBatchSize" -> "2000", 
"bulkCopyTableLock" -> "false", 
"bulkCopyTimeout" -> "0" 
)) 

dff.bulkCopyToSqlDB(bulkCopyConfig) 

Проблема, с которой я сталкиваюсь, заключается в том, что значение даты и времени, которое я выбираю как getdate() as LoadDate, выдает мне эту ошибку при попытке вставить в вышеупомянутую таблицу: SqlNativeBufferBufferBulkCopy.WriteTdsDataToServer, error in OdbcDone: SqlState: 42000, NativeError: 4816, 'Error calling: bcp_done(this->GetHdbc()) | SQL Error Info: SrvrMsgState: 1, SrvrSeverity: 16, Error <1>: ErrorMsg: [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column type from bcp client for colid 2. | Error calling: pConn->Done() | state: FFFF, number: 58673, active connections: 9', Connection String: Driver={pdwodbc17e};app=TypeC01-DmsNativeWriter:DB66\mpdwsvc (13056)-ODBC;trusted_connection=yes;autotranslate=no;server=\\.\pipe\DB.66-a313018f1e5b\sql\query;database=Distribution_15

Даже при попытке не использовать значение datetime из запроса SQL Server и изменении значения LoadDate на: withColumn("LoadDate",current_timestamp()), при попытке использовать встроенную функцию current_timestamp в spark, она все равно не работает .

Я видел эту статью stackoverflow, которая похожа на вопрос, но все еще не ответила на вопрос. У кого-нибудь есть хороший пример того, как вставить в таблицу SQL Server с типом данных datetime с использованием библиотеки com.microsoft.azure.sqldb.spark.bulkcopy._?

Вот пример данных из dff.show()

    +-------+--------------------+--------------------+---------------+----------------+
    |DrgCode|            LoadDate|              DrgKey|    LoadProcess|    RecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    |    390|2019-07-02 09:05:...|48a1a756f2d83f1dc...|TestLoadProcess|TestRecordSource|
    |     18|2019-07-02 09:05:...|4ec9599fc203d176a...|TestLoadProcess|TestRecordSource|
    |    481|2019-07-02 09:05:...|51d089cdaf0c968c9...|TestLoadProcess|TestRecordSource|
    |    460|2019-07-02 09:05:...|841a05fd378a2c067...|TestLoadProcess|TestRecordSource|
    |    838|2019-07-02 09:05:...|cef5838d118dccd9d...|TestLoadProcess|TestRecordSource|
    |     61|2019-07-02 09:05:...|d029fa3a95e174a19...|TestLoadProcess|TestRecordSource|
    |    807|2019-07-02 09:05:...|fce86e339dc3131c4...|TestLoadProcess|TestRecordSource|
    |     44|2019-07-02 09:05:...|71ee45a3c0db9a986...|TestLoadProcess|TestRecordSource|
    |    267|2019-07-02 09:05:...|8acc23987b8960d83...|TestLoadProcess|TestRecordSource|
    |    222|2019-07-02 09:05:...|9b871512327c09ce9...|TestLoadProcess|TestRecordSource|
    |    934|2019-07-02 09:05:...|a8443b1426652157e...|TestLoadProcess|TestRecordSource|
    |    677|2019-07-02 09:05:...|2782526eaa0c5c254...|TestLoadProcess|TestRecordSource|
    |    701|2019-07-02 09:05:...|290a0b92873bdf4e4...|TestLoadProcess|TestRecordSource|
    |    441|2019-07-02 09:05:...|2dfe70c43208f52b9...|TestLoadProcess|TestRecordSource|
    |    439|2019-07-02 09:05:...|50a010ce24d089605...|TestLoadProcess|TestRecordSource|
    |    883|2019-07-02 09:05:...|3055e0d8130c7a197...|TestLoadProcess|TestRecordSource|
    |    947|2019-07-02 09:05:...|4d0198f4905a08812...|TestLoadProcess|TestRecordSource|
    |    369|2019-07-02 09:05:...|5f193b350c8aba488...|TestLoadProcess|TestRecordSource|
    |     21|2019-07-02 09:05:...|6f4b6612125fb3a0d...|TestLoadProcess|TestRecordSource|
    |    503|2019-07-02 09:05:...|7182dd431b5c8833e...|TestLoadProcess|TestRecordSource|
    +-------+--------------------+--------------------+---------------+----------------+
    only showing top 20 rows

dff:org.apache.spark.sql.DataFrame
DrgCode:string
LoadDate:timestamp
DrgKey:string
LoadProcess:string
RecordSource:string
...