Почему фабрика данных Azure, по-видимому, настаивает на добавлении DateTimes в виде строки? - PullRequest
4 голосов
/ 09 июля 2019

Я пытаюсь настроить фабрику данных Azure для копирования и денормализации моих данных из базы данных AzureSQL в другую базу данных AzureSQL для отчетов / целей BI с потоком данных, но я столкнулся с проблемой вставки дат.

Это определение моего потока данных.

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable2",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tBucketId as string,\n\t\tStreamId as string,\n\t\tStreamIdOriginal as string,\n\t\tStreamRevision as integer,\n\t\tItems as integer,\n\t\tCommitId as string,\n\t\tCommitSequence as integer,\n\t\tCommitStamp as timestamp,\n\t\tCheckpointNumber as long,\n\t\tDispatched as boolean,\n\t\tHeaders as binary,\n\t\tPayload as binary\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false,\n\tmapColumn(\n\t\tBucketId,\n\t\tCommitStamp\n\t)) ~> sink1"
        }
    }
}

и это определения моего источника

{
    "name": "AzureSqlTable1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Source_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "BucketId",
                "type": "varchar"
            },
            {
                "name": "StreamId",
                "type": "char"
            },
            {
                "name": "StreamIdOriginal",
                "type": "nvarchar"
            },
            {
                "name": "StreamRevision",
                "type": "int",
                "precision": 10
            },
            {
                "name": "Items",
                "type": "tinyint",
                "precision": 3
            },
            {
                "name": "CommitId",
                "type": "uniqueidentifier"
            },
            {
                "name": "CommitSequence",
                "type": "int",
                "precision": 10
            },
            {
                "name": "CommitStamp",
                "type": "datetime2",
                "scale": 7
            },
            {
                "name": "CheckpointNumber",
                "type": "bigint",
                "precision": 19
            },
            {
                "name": "Dispatched",
                "type": "bit"
            },
            {
                "name": "Headers",
                "type": "varbinary"
            },
            {
                "name": "Payload",
                "type": "varbinary"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[Commits]"
        }
    }
}

и наборы данных приемника

{
    "name": "AzureSqlTable2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "Dest_Test",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [],
        "typeProperties": {
            "tableName": "dbo.Test2"
        }
    }
}

При запуске моего конвейера с потоком данных я получаю следующую ошибку:

Activity dataflow1 failed: DF-EXEC-1 Conversion failed when converting date and/or time from character string.
com.microsoft.sqlserver.jdbc.SQLServerException: Conversion failed when converting date and/or time from character string.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:258)
    at com.microsoft.sqlserver.jdbc.TDSTokenHandler.onEOF(tdsparser.java:256)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:108)
    at com.microsoft.sqlserver.jdbc.TDSParser.parse(tdsparser.java:28)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1611)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$200(SQLServerBulkCopy.java:58)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:709)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7151)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2478)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:739)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1684)
    at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:669)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions.com$microsoft$azure$sqldb$spark$connect$DataFrameFunctions$$bulkCopy(DataFrameFunctions.scala:127)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at com.microsoft.azure.sqldb.spark.connect.DataFrameFunctions$$anonfun$bulkCopyToSqlDB$1.apply(DataFrameFunctions.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:948)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2226)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:124)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:459)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1401)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Мой журнал аудита SQL Azure показывает следующее невыполненное утверждение (что неудивительно, учитывая, что в качестве типа для [CommitStamp] используется VARCHAR(50):

INSERT BULK dbo.T_301fcb5e4a4148d4a48f2943011b2f04 (
  [BucketId] NVARCHAR(MAX), 
  [CommitStamp] VARCHAR(50), 
  [StreamId] NVARCHAR(MAX), 
  [StreamIdOriginal] NVARCHAR(MAX),
  [StreamRevision] INT,
  [Items] INT,
  [CommitId] NVARCHAR(MAX),
  [CommitSequence] INT, 
  [CheckpointNumber] BIGINT, 
  [Dispatched] BIT,
  [Headers] VARBINARY(MAX),
  [Payload] VARBINARY(MAX),
  [r8e440f7252bb401b9ead107597de6293] INT) 
with (ROWS_PER_BATCH = 4096, TABLOCK)

Я понятия не имею, почему это происходит. Похоже, информация о схеме верна, но каким-то образом кажется, что фабрика данных / поток данных хочет вставить CommitStamp как строковый тип.

По запросу, вывод из потока данных / кода / плана:



source(output(
        BucketId as string,
        StreamId as string,
        StreamIdOriginal as string,
        StreamRevision as integer,
        Items as integer,
        CommitId as string,
        CommitSequence as integer,
        CommitStamp as timestamp,
        CheckpointNumber as long,
        Dispatched as boolean,
        Headers as binary,
        Payload as binary
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[Commits]',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Application',
    user: 'Sign2025Admin',
    password: '**********') ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    mapColumn(
        BucketId,
        CommitStamp
    ),
    schemaName: 'dbo',
    tableName: 'Test2',
    store: 'sqlserver',
    server: 'sign2025-sqldata.database.windows.net',
    database: 'SignPath.Reporting',
    user: 'Sign2025Admin',
    password: '**********') ~> sink1

Ответы [ 2 ]

1 голос
/ 11 июля 2019

Похоже, ваш набор данных Sink определяет myTime в виде строки:

раковина (вход ( ID как целое число, tName as string, myTime как строка )

Можете ли вы изменить это на метку времени или дату, в зависимости от того, как бы вы хотели это сделать?

В качестве альтернативы, вы можете поместить данные во временную промежуточную таблицу в SQL, установив «Пересоздать таблицу» в Sink и позволить ADF генерировать новое определение таблицы на лету, используя типы данных ваших сопоставленных полей в потоке данных.

1 голос
/ 09 июля 2019

Я создал поток данных для копирования данных из базы данных SQL Azure в другую базу данных SQL Azure. Удалось преобразовать datatime2 в VARCHAR(50).

Это определение моего потока данных:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_sto",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DestinationDataset_mex",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "script": "\n\nsource(output(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as timestamp\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tisolationLevel: 'READ_UNCOMMITTED',\n\tformat: 'table') ~> source1\nsource1 sink(input(\n\t\tID as integer,\n\t\ttName as string,\n\t\tmyTime as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false,\n\tformat: 'table',\n\tdeletable:false,\n\tinsertable:true,\n\tupdateable:false,\n\tupsertable:false) ~> sink1"
        }
    }
}

Определения моего источника:

{
    "name": "DestinationDataset_sto",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "datetime2",
                "scale": 7
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

Настройки мойки:

{
    "name": "DestinationDataset_mex",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureSqlDatabase1",
            "type": "LinkedServiceReference"
        },
        "annotations": [],
        "type": "AzureSqlTable",
        "schema": [
            {
                "name": "ID",
                "type": "int",
                "precision": 10
            },
            {
                "name": "tName",
                "type": "varchar"
            },
            {
                "name": "myTime",
                "type": "varchar"
            }
        ],
        "typeProperties": {
            "tableName": "[dbo].[demo1]"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

Вот мои шаги потока данных. enter image description here

Шаг 1: Настройки источника: enter image description here enter image description here

Шаг 2: Настройки раковины: enter image description here enter image description here enter image description here enter image description here

Выполнение выполнено успешно: enter image description here

Таблицы demo и demo1 почти имеют одинаковую схему за исключением myTime.

Моя исходная таблица и ее данные:

enter image description here

Моя таблица приемников и данные, скопированные с demo:

enter image description here

План потока данных:

source(output(
        ID as integer,
        tName as string,
        myTime as timestamp
    ),
    allowSchemaDrift: true,
    validateSchema: true,
    isolationLevel: 'SERIALIZABLE',
    format: 'table',
    schemaName: '[dbo]',
    tableName: '[demo]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> source1
source1 sink(input(
        ID as integer,
        tName as string,
        myTime as string
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    format: 'table',
    deletable:false,
    insertable:true,
    updateable:false,
    upsertable:false,
    schemaName: '[dbo]',
    tableName: '[demo1]',
    store: 'sqlserver',
    server: '****.database.windows.net',
    database: '****',
    user: 'ServerAdmin',
    password: '**********') ~> sink1

Update1:

Я вручную создаю таблицу приемников и обнаружил, что:

Поток данных может конвертировать datatime2 в VARCHAR() (может быть NVARCHAR()), date, datetimeoffset.

Когда я пытаюсь ввести тип даты time, datetime, datetime2, smalldatetime, поток данных всегда выдает ошибку:

"message": "DF-EXEC-1 Conversion failed when converting date and/or time from character 

Обновление 2019-7-11:

Я обратился за помощью в службу поддержки Azure, и они ответили мне: это ошибка потока данных, и пока нет решения. enter image description here

Обновление 2019-7-12:

Я протестировал поддержку Azure, и они подтверждают, что это ошибка. Вот новое письмо: enter image description here

Они также сказали мне, что исправление уже сделано, и оно будет развернуто в следующем поезде развертывания. Это может быть конец следующей недели .

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...