Создание конвейера с запланированным триггером с ADFV2 - PullRequest
0 голосов
/ 13 ноября 2018

Я пытаюсь перенести конвейер, который уже существует в ADFV1, в ADFV2 и у меня есть некоторые проблемы с концепцией триггеров. В моем конвейере есть две активизации: первая - это операция аналитики озера данных Azure, а вторая - операция копирования. Первое действие запускает сценарий usql, в котором данные считываются из разделенной папки / {yyyy} / {MM} / {dd} /, обрабатываются и записываются в папку / {yyyy} - {MM} - {dd} /. Вот некоторые JSON-файлы из моей фабрики (конвейер, триггер и наборы данных).

Pipeline:

{
"name": "StreamCompressionBlob2SQL",
"properties": {
    "activities": [
        {
            "name": "compress",
            "type": "DataLakeAnalyticsU-SQL",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
                "scriptLinkedService": {
                    "referenceName": "AzureBlobStorage",
                    "type": "LinkedServiceReference"
                },
                "parameters": {
                    "Year": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
                        "type": "Expression"
                    },
                    "Month": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
                        "type": "Expression"
                    },
                    "Day": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
                        "type": "Expression"
                    }
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureDataLakeAnalytics1",
                "type": "LinkedServiceReference"
            }
        },
        {
            "name": "Blob2SQL",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "compress",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "source": {
                    "type": "BlobSource",
                    "recursive": true
                },
                "sink": {
                    "type": "SqlSink",
                    "writeBatchSize": 10000
                },
                "enableStaging": false,
                "dataIntegrationUnits": 0,
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": {
                        "tag": "TAG",
                        "device_id": "DEVICE_ID",
                        "system_id": "SYSTEM_ID",
                        "utc": "UTC",
                        "ts": "TS",
                        "median": "MEDIAN",
                        "min": "MIN",
                        "max": "MAX",
                        "avg": "AVG",
                        "stdev": "STDEV",
                        "first_value": "FIRST_VALUE",
                        "last_value": "LAST_VALUE",
                        "message_count": "MESSAGE_COUNT"
                    }
                }
            },
            "inputs": [
                {
                    "referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
                    "type": "DatasetReference"
                }
            ],
            "outputs": [
                {
                    "referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
                    "type": "DatasetReference"
                }
            ]
        }
    ],
    "parameters": {
        "windowStartTime": {
            "type": "String"
        }
    }
}

}

Trigger:

{
"name": "trigger1",
"properties": {
    "runtimeState": "Started",
    "pipelines": [
        {
            "pipelineReference": {
                "referenceName": "StreamCompressionBlob2SQL",
                "type": "PipelineReference"
            },
            "parameters": {
                "windowStartTime": "@trigger().scheduledTime"
            }
        }
    ],
    "type": "ScheduleTrigger",
    "typeProperties": {
        "recurrence": {
            "frequency": "Day",
            "interval": 1,
            "startTime": "2018-08-17T10:46:00.000Z",
            "endTime": "2018-11-04T10:46:00.000Z",
            "timeZone": "UTC"
        }
    }
}

}

Входной набор данных для операции копирования:

{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureBlobStorage",
        "type": "LinkedServiceReference"
    },
    "parameters": {
        "Year": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Month": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Day": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        }
    },
    "type": "AzureBlob",
    "structure": [
        {
            "name": "tag",
            "type": "String"
        },
        {
            "name": "device_id",
            "type": "String"
        },
        {
            "name": "system_id",
            "type": "String"
        },
        {
            "name": "utc",
            "type": "DateTime"
        },
        {
            "name": "ts",
            "type": "DateTime"
        },
        {
            "name": "median",
            "type": "Double"
        },
        {
            "name": "min",
            "type": "Double"
        },
        {
            "name": "max",
            "type": "Double"
        },
        {
            "name": "avg",
            "type": "Double"
        },
        {
            "name": "stdev",
            "type": "Double"
        },
        {
            "name": "first_value",
            "type": "Double"
        },
        {
            "name": "last_value",
            "type": "Double"
        },
        {
            "name": "message_count",
            "type": "Int16"
        }
    ],
    "typeProperties": {
        "format": {
            "type": "TextFormat",
            "columnDelimiter": ";",
            "nullValue": "\\N",
            "treatEmptyAsNull": true,
            "skipLineCount": 0,
            "firstRowAsHeader": true
        },
        "fileName": "",
        "folderPath": {
            "value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
            "type": "Expression"
        }
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

Выходной набор данных для операции копирования:

{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureSqlDatabase1",
        "type": "LinkedServiceReference"
    },
    "type": "AzureSqlTable",
    "structure": [
        {
            "name": "TAG",
            "type": "String"
        },
        {
            "name": "DEVICE_ID",
            "type": "String"
        },
        {
            "name": "SYSTEM_ID",
            "type": "String"
        },
        {
            "name": "UTC",
            "type": "DateTime"
        },
        {
            "name": "TS",
            "type": "DateTime"
        },
        {
            "name": "MEDIAN",
            "type": "Decimal"
        },
        {
            "name": "MIN",
            "type": "Decimal"
        },
        {
            "name": "MAX",
            "type": "Decimal"
        },
        {
            "name": "AVG",
            "type": "Decimal"
        },
        {
            "name": "STDEV",
            "type": "Decimal"
        },
        {
            "name": "FIRST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "LAST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "MESSAGE_COUNT",
            "type": "Int32"
        }
    ],
    "typeProperties": {
        "tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

} * * тысяча двадцать-один

Моя проблема в том, что после публикации ничего не происходит. Любые предложения ??

1 Ответ

0 голосов
/ 14 ноября 2018

Триггер по расписанию не поддерживает сценарий обратной засыпки (в зависимости от определения триггера - вы начинаете с 17 августа 2018 года).При запуске по расписанию конвейер может выполняться только в периоды времени от текущего времени и будущего.

В вашем случае для сценариев обратной засыпки используйте триггерный триггер.

...