Следующее объяснение будет проходить через создание конвейера, который выглядит следующим образом. Обратите внимание, что используются действия с хранимыми процедурами, веб-действия и действия для каждого
Сначала подготовьте базу данных SQL Azure, настройте администратора AAD, затем предоставьте разрешения ADI MSI в базе данных, как описано здесь . Затем создайте следующую таблицу и две хранимые процедуры:
CREATE TABLE [dbo].[People](
[id] [int] NULL,
[email] [varchar](255) NULL,
[first_name] [varchar](100) NULL,
[last_name] [varchar](100) NULL,
[avatar] [nvarchar](1000) NULL
)
GO
/*
sample call:
exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'
*/
create proc uspInsertPeople @json nvarchar(max)
as
begin
insert into People (id, email, first_name, last_name, avatar)
select d.*
from OPENJSON(@json)
WITH (
[data] nvarchar(max) '$.data' as JSON
)
CROSS APPLY OPENJSON([data], '$')
WITH (
id int '$.id',
email varchar(255) '$.email',
first_name varchar(100) '$.first_name',
last_name varchar(100) '$.last_name',
avatar nvarchar(1000) '$.avatar'
) d;
end
GO
create proc uspTruncatePeople
as
truncate table People
Затем в Azure Data Factory v2 создайте новый конвейер, переименуйте его в ForEachPage, затем перейдите в представление кода и вставьте следующий JSON:
{
"name": "ForEachPage",
"properties": {
"activities": [
{
"name": "GetTotalPages",
"type": "WebActivity",
"dependsOn": [
{
"activity": "Truncate SQL Table",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "https://reqres.in/api/users?page=1",
"type": "Expression"
},
"method": "GET"
}
},
{
"name": "ForEachPage",
"type": "ForEach",
"dependsOn": [
{
"activity": "GetTotalPages",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@range(1,activity('GetTotalPages').output.total_pages)",
"type": "Expression"
},
"activities": [
{
"name": "GetPage",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"url": {
"value": "@concat('https://reqres.in/api/users?page=',item())",
"type": "Expression"
},
"method": "GET"
}
},
{
"name": "uspInsertPeople Sproc",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "GetPage",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[dbo].[uspInsertPeople]",
"storedProcedureParameters": {
"json": {
"value": {
"value": "@string(activity('GetPage').output)",
"type": "Expression"
},
"type": "String"
}
}
},
"linkedServiceName": {
"referenceName": "lsAzureDB",
"type": "LinkedServiceReference"
}
}
]
}
},
{
"name": "Truncate SQL Table",
"type": "SqlServerStoredProcedure",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[dbo].[uspTruncatePeople]"
},
"linkedServiceName": {
"referenceName": "lsAzureDB",
"type": "LinkedServiceReference"
}
}
],
"annotations": []
}
}
Создайте связанную службу lsAzureDB для базы данных SQL Azure, указав в ней использование MSI для аутентификации.
Этот конвейер вызывает образец постраничного API (который работает в данный момент, но это не API, которым я управляю, поэтому в какой-то момент он может перестать работать), чтобы продемонстрировать, как выполнять цикл и как принимать результаты Веб-активности и вставьте их в таблицу SQL с помощью вызова хранимой процедуры и анализа JSON в хранимой процедуре. Цикл будет работать с параллелизмом, но вы, конечно, можете изменить настройки действия ForEachPage, чтобы он выполнялся последовательно.