Я пытаюсь перенести конвейер, который уже существует в ADFV1, в ADFV2 и у меня есть некоторые проблемы с концепцией триггеров. В моем конвейере есть две активизации: первая - это операция аналитики озера данных Azure, а вторая - операция копирования.
Первое действие запускает сценарий usql, в котором данные считываются из разделенной папки / {yyyy} / {MM} / {dd} /, обрабатываются и записываются в папку / {yyyy} - {MM} - {dd} /.
Вот некоторые JSON-файлы из моей фабрики (конвейер, триггер и наборы данных).
Pipeline:
{
"name": "StreamCompressionBlob2SQL",
"properties": {
"activities": [
{
"name": "compress",
"type": "DataLakeAnalyticsU-SQL",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
"scriptLinkedService": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
"type": "Expression"
},
"Month": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
"type": "Expression"
},
"Day": {
"value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
"type": "Expression"
}
}
},
"linkedServiceName": {
"referenceName": "AzureDataLakeAnalytics1",
"type": "LinkedServiceReference"
}
},
{
"name": "Blob2SQL",
"type": "Copy",
"dependsOn": [
{
"activity": "compress",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000
},
"enableStaging": false,
"dataIntegrationUnits": 0,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"tag": "TAG",
"device_id": "DEVICE_ID",
"system_id": "SYSTEM_ID",
"utc": "UTC",
"ts": "TS",
"median": "MEDIAN",
"min": "MIN",
"max": "MAX",
"avg": "AVG",
"stdev": "STDEV",
"first_value": "FIRST_VALUE",
"last_value": "LAST_VALUE",
"message_count": "MESSAGE_COUNT"
}
}
},
"inputs": [
{
"referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"windowStartTime": {
"type": "String"
}
}
}
}
Trigger:
{
"name": "trigger1",
"properties": {
"runtimeState": "Started",
"pipelines": [
{
"pipelineReference": {
"referenceName": "StreamCompressionBlob2SQL",
"type": "PipelineReference"
},
"parameters": {
"windowStartTime": "@trigger().scheduledTime"
}
}
],
"type": "ScheduleTrigger",
"typeProperties": {
"recurrence": {
"frequency": "Day",
"interval": 1,
"startTime": "2018-08-17T10:46:00.000Z",
"endTime": "2018-11-04T10:46:00.000Z",
"timeZone": "UTC"
}
}
}
}
Входной набор данных для операции копирования:
{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage",
"type": "LinkedServiceReference"
},
"parameters": {
"Year": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Month": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
},
"Day": {
"type": "String",
"defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
}
},
"type": "AzureBlob",
"structure": [
{
"name": "tag",
"type": "String"
},
{
"name": "device_id",
"type": "String"
},
{
"name": "system_id",
"type": "String"
},
{
"name": "utc",
"type": "DateTime"
},
{
"name": "ts",
"type": "DateTime"
},
{
"name": "median",
"type": "Double"
},
{
"name": "min",
"type": "Double"
},
{
"name": "max",
"type": "Double"
},
{
"name": "avg",
"type": "Double"
},
{
"name": "stdev",
"type": "Double"
},
{
"name": "first_value",
"type": "Double"
},
{
"name": "last_value",
"type": "Double"
},
{
"name": "message_count",
"type": "Int16"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ";",
"nullValue": "\\N",
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": true
},
"fileName": "",
"folderPath": {
"value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
"type": "Expression"
}
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Выходной набор данных для операции копирования:
{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureSqlDatabase1",
"type": "LinkedServiceReference"
},
"type": "AzureSqlTable",
"structure": [
{
"name": "TAG",
"type": "String"
},
{
"name": "DEVICE_ID",
"type": "String"
},
{
"name": "SYSTEM_ID",
"type": "String"
},
{
"name": "UTC",
"type": "DateTime"
},
{
"name": "TS",
"type": "DateTime"
},
{
"name": "MEDIAN",
"type": "Decimal"
},
{
"name": "MIN",
"type": "Decimal"
},
{
"name": "MAX",
"type": "Decimal"
},
{
"name": "AVG",
"type": "Decimal"
},
{
"name": "STDEV",
"type": "Decimal"
},
{
"name": "FIRST_VALUE",
"type": "Decimal"
},
{
"name": "LAST_VALUE",
"type": "Decimal"
},
{
"name": "MESSAGE_COUNT",
"type": "Int32"
}
],
"typeProperties": {
"tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
} * * тысяча двадцать-один
Моя проблема в том, что после публикации ничего не происходит.
Любые предложения ??