Я пытаюсь настроить фабрику данных Azure для копирования и денормализации моих данных из базы данных AzureSQL в другую базу данных AzureSQL для отчетов / целей BI с потоком данных, но я столкнулся с проблемой вставки дат.
Это определение моего потока данных.
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "AzureSqlTable1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "AzureSqlTable2",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
}
}
}
и это определения моего источника
{
"name": "AzureSqlTable1",
"properties": {
"linkedServiceName": {
"referenceName": "Source_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [
{
"name": "BucketId",
"type": "varchar"
},
{
"name": "StreamId",
"type": "char"
},
{
"name": "StreamIdOriginal",
"type": "nvarchar"
},
{
"name": "StreamRevision",
"type": "int",
"precision": 10
},
{
"name": "Items",
"type": "tinyint",
"precision": 3
},
{
"name": "CommitId",
"type": "uniqueidentifier"
},
{
"name": "CommitSequence",
"type": "int",
"precision": 10
},
{
"name": "CommitStamp",
"type": "datetime2",
"scale": 7
},
{
"name": "CheckpointNumber",
"type": "bigint",
"precision": 19
},
{
"name": "Dispatched",
"type": "bit"
},
{
"name": "Headers",
"type": "varbinary"
},
{
"name": "Payload",
"type": "varbinary"
}
],
"typeProperties": {
"tableName": "[dbo].[Commits]"
}
}
}
и наборы данных приемника
{
"name": "AzureSqlTable2",
"properties": {
"linkedServiceName": {
"referenceName": "Dest_Test",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlTable",
"schema": [],
"typeProperties": {
"tableName": "dbo.Test2"
}
}
}
При запуске моего конвейера с потоком данных я получаю следующую ошибку:
Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:124)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Мой журнал аудита SQL Azure показывает следующее невыполненное утверждение (что неудивительно, учитывая, что в качестве типа для [CommitStamp]
используется VARCHAR(50)
:
INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
[BucketId] NVARCHAR(MAX),
[CommitStamp] VARCHAR(50),
[StreamId] NVARCHAR(MAX),
[StreamIdOriginal] NVARCHAR(MAX),
[StreamRevision] INT,
[Items] INT,
[CommitId] NVARCHAR(MAX),
[CommitSequence] INT,
[CheckpointNumber] BIGINT,
[Dispatched] BIT,
[Headers] VARBINARY(MAX),
[Payload] VARBINARY(MAX),
[r8e440f7252bb401b9ead107597de6293] INT)
with (ROWS_PER_BATCH = 4096, TABLOCK)
Я понятия не имею, почему это происходит. Похоже, информация о схеме верна, но каким-то образом кажется, что фабрика данных / поток данных хочет вставить CommitStamp
как строковый тип.
По запросу, вывод из потока данных / кода / плана:
source(output(
BucketId as string,
StreamId as string,
StreamIdOriginal as string,
StreamRevision as integer,
Items as integer,
CommitId as string,
CommitSequence as integer,
CommitStamp as timestamp,
CheckpointNumber as long,
Dispatched as boolean,
Headers as binary,
Payload as binary
),
allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table',
schemaName: '[dbo]',
tableName: '[Commits]',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Application',
user: 'Sign2025Admin',
password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
deletable:false,
insertable:true,
updateable:false,
upsertable:false,
mapColumn(
BucketId,
CommitStamp
),
schemaName: 'dbo',
tableName: 'Test2',
store: 'sqlserver',
server: 'sign2025-sqldata.database.windows.net',
database: 'SignPath.Reporting',
user: 'Sign2025Admin',
password: '**********') ~> sink1