Что такое ADF v2, эквивалентный WindowStart? - PullRequest
0 голосов
/ 14 сентября 2018

Мы осуществляем миграцию из ADF V1 в ADF V2.Наши трубопроводы должны работать ежедневно, и у нас есть эта часть V2.Трубопровод будет запущен в назначенное время.

Мой вопрос заключается в том, как передать время запланированного запуска триггера в условие where оператора select в операции копирования?Я подозреваю, что пропустил или неправильно прочитал некоторые документы, но я просто не могу понять это.

Независимо от того, что я пытаюсь, переменная, кажется, не оценивается во время выполнения.Таким образом, выражение where при запуске выглядит примерно так: where last_update> = '@ pipeline (). TriggerTime' И, очевидно, @pipeline (). TriggerTime - не дата.

Я вытащил этот списокпараметров из https://docs.microsoft.com/en-us/azure/data-factory/control-flow-system-variables

Это то, что работало в V1: "sqlReaderQuery": "$$ Text.Format ('выбрать столбцы из таблицы, где last_update> = \' {0: гггг-ММ-dd} \ '', WindowStart, WindowEnd) "

То, что я пробовал:

- Передача @trigger (). scheduleTime в качестве параметра в триггере для @ {formatDateTime (pipe (). parameters.startDate, 'yyyy-MM-dd')} в предложении where с параметром в конвейере, определенным как startDate типа string.

- Установка @pipeline (). TriggerTime asзначение по умолчанию для pipe (). parameters.startDate.

- Вызов @pipeline (). TriggerTime в предложении where.

Заранее большое спасибо.


РЕДАКТИРОВАТЬ:

Источник конвейера

{
    "name": "PL_ADLS_RAW_IDMTables_DAILY",
    "properties": {
        "activities": [
            {
                "name": "isStartDateNotNull",
                "description": "If the startDate parameter == 0 then run the full load. If the startDate parameter != 0 then run for >= startDate parameter.",
                "type": "IfCondition",
                "typeProperties": {
                    "expression": {
                        "value": "@equals(pipeline().parameters.startDate,'0')",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "copy_ENTITY_FULL",
                            "description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 60,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY] where LAST_UPDATE_DATE >= '@{formatDateTime(pipeline().parameters.startDate,'yyyy-MM-dd')}'",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "AzureDataLakeStoreSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "DS_IN_SQL_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ]
                        }
                    ],
                    "ifTrueActivities": [
                        {
                            "name": "copy_ENTITY",
                            "description": "Copies ENTITY from IDM Database to ADLS Raw Zone",
                            "type": "Copy",
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 60,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "SqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT [ENTITY_ID],[ENTITY_TYPE_ID],REPLACE(REPLACE([SHORT_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_NAME], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([ENTITY_DESCRIPTION], CHAR(13),' '), CHAR(10),' '),REPLACE(REPLACE([STATUS], CHAR(13),' '), CHAR(10),' '),[STATUS_DATE],[REVISION_ID],[CREATED_BY],[CREATION_DATE],[LAST_UPDATED_BY],[LAST_UPDATE_DATE],REPLACE(REPLACE([COMMENTS], CHAR(13),' '), CHAR(10),' ') FROM [dbo].[ENTITY]",
                                        "type": "Expression"
                                    }
                                },
                                "sink": {
                                    "type": "AzureDataLakeStoreSink"
                                },
                                "enableStaging": false,
                                "dataIntegrationUnits": 0
                            },
                            "inputs": [
                                {
                                    "referenceName": "DS_IN_SQL_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "DS_OUT_ADLS_RAW_IDM_ENTITY",
                                    "type": "DatasetReference"
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "startDate": {
                "type": "String"
            }
        }
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Источник триггера:

{
    "name": "TR_SCHED_0800EST",
    "properties": {
        "description": "Daily 0800 EST",
        "runtimeState": "Stopped",
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "PL_ADLS_RAW_IDMTables_DAILY",
                    "type": "PipelineReference"
                },
                "parameters": {
                    "startDate": "@trigger().scheduledTime"
                }
            }
        ],
        "type": "ScheduleTrigger",
        "typeProperties": {
            "recurrence": {
                "frequency": "Day",
                "interval": 1,
                "startTime": "2018-08-30T13:00:00Z",
                "timeZone": "UTC",
                "schedule": {
                    "minutes": [
                        0
                    ],
                    "hours": [
                        13
                    ]
                }
            }
        }
    }
}

РЕДАКТИРОВАНИЕ

Это работает

enter image description here

Это не работает

enter image description here

1 Ответ

0 голосов
/ 14 сентября 2018

Передача @trigger (). ScheduleTime в качестве параметра в триггере в @ {formatDateTime (pipeline (). Parameters.startDate, 'yyyy-MM-dd')} в предложении where с параметром в определенном конвейере как startDate типа строка. это правильный путь.

Когда вы редактируете триггер в пользовательском интерфейсе, Убедитесь, что вы передаете значение следующим образом. enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...