Как я могу использовать этот Rest API в Azure Data Factory - PullRequest
0 голосов
/ 09 июля 2019

У меня есть REST API, который мне нужно вызвать из фабрики данных Azure и вставить данные в таблицу SQL.

Формат JSON, возвращаемый из API, имеет следующий формат:

{
    "serviceResponse": {
        "supportOffice": "EUKO",
        "totalPages": 5,
        "pageNo": 1,
        "recordsPerPage": 1000,
        "projects": [
            { "projectID":1 ...} , { "projectID":2 ...} ,...
        ]
    }
}

URL-адрес находится в формате http://server.com/api/Projects?pageNo=1

Мне удалось установитьRestService для вызова API и возврата JSON и приемника SQL, который возьмет JSON и передаст его хранимой процедуре, которая затем сохранит данные.

Однако я борюсь с тем, как справиться с нумерацией страниц.

Я пробовал:

  1. Параметры разбиения на страницы на RestService: я не думаю, что это будет работать, поскольку он допускает только XPATH, который возвращает полный следующий URL.Я не вижу, чтобы он позволял вычислять URL из totalPages и pageNo.(или я не смог заставить его работать)

  2. Я пытался добавить веб-вызов к API перед обработкой, чтобы затем рассчитать количество страниц.Хотя это и не идеально, оно работало, пока я не достиг предела 1 Мб / 1 мин, так как некоторые ответы довольно велики.Это не сработает.

  3. Я пытался увидеть, может ли API измениться, но это невозможно.

Мне было интересно, есть ли у кого-нибудь какие-либо идеи о том, как я могу заставить это работать, или успешно использовал подобный API?

1 Ответ

0 голосов
/ 10 июля 2019

Следующее объяснение будет проходить через создание конвейера, который выглядит следующим образом. Обратите внимание, что используются действия с хранимыми процедурами, веб-действия и действия для каждого pipeline screenshot

for each activities screenshot

Сначала подготовьте базу данных SQL Azure, настройте администратора AAD, затем предоставьте разрешения ADI MSI в базе данных, как описано здесь . Затем создайте следующую таблицу и две хранимые процедуры:

CREATE TABLE [dbo].[People](
    [id] [int] NULL,
    [email] [varchar](255) NULL,
    [first_name] [varchar](100) NULL,
    [last_name] [varchar](100) NULL,
    [avatar] [nvarchar](1000) NULL
)

GO
/*
sample call:
exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'
*/
create proc uspInsertPeople @json nvarchar(max)
as
begin
insert into People (id, email, first_name, last_name, avatar)
select d.*
from OPENJSON(@json)
WITH (
        [data] nvarchar(max) '$.data' as JSON
)
CROSS APPLY OPENJSON([data], '$')
    WITH (
        id int '$.id',
        email varchar(255) '$.email',
        first_name varchar(100) '$.first_name',
        last_name varchar(100) '$.last_name',
        avatar nvarchar(1000) '$.avatar'
    ) d;
end

GO

create proc uspTruncatePeople
as
truncate table People


Затем в Azure Data Factory v2 создайте новый конвейер, переименуйте его в ForEachPage, затем перейдите в представление кода и вставьте следующий JSON:

{
    "name": "ForEachPage",
    "properties": {
        "activities": [
            {
                "name": "GetTotalPages",
                "type": "WebActivity",
                "dependsOn": [
                    {
                        "activity": "Truncate SQL Table",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "url": {
                        "value": "https://reqres.in/api/users?page=1",
                        "type": "Expression"
                    },
                    "method": "GET"
                }
            },
            {
                "name": "ForEachPage",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetTotalPages",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(1,activity('GetTotalPages').output.total_pages)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "GetPage",
                            "type": "WebActivity",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "url": {
                                    "value": "@concat('https://reqres.in/api/users?page=',item())",
                                    "type": "Expression"
                                },
                                "method": "GET"
                            }
                        },
                        {
                            "name": "uspInsertPeople Sproc",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "GetPage",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[dbo].[uspInsertPeople]",
                                "storedProcedureParameters": {
                                    "json": {
                                        "value": {
                                            "value": "@string(activity('GetPage').output)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "lsAzureDB",
                                "type": "LinkedServiceReference"
                            }
                        }
                    ]
                }
            },
            {
                "name": "Truncate SQL Table",
                "type": "SqlServerStoredProcedure",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "storedProcedureName": "[dbo].[uspTruncatePeople]"
                },
                "linkedServiceName": {
                    "referenceName": "lsAzureDB",
                    "type": "LinkedServiceReference"
                }
            }
        ],
        "annotations": []
    }
}

Создайте связанную службу lsAzureDB для базы данных SQL Azure, указав в ней использование MSI для аутентификации.

Этот конвейер вызывает образец постраничного API (который работает в данный момент, но это не API, которым я управляю, поэтому в какой-то момент он может перестать работать), чтобы продемонстрировать, как выполнять цикл и как принимать результаты Веб-активности и вставьте их в таблицу SQL с помощью вызова хранимой процедуры и анализа JSON в хранимой процедуре. Цикл будет работать с параллелизмом, но вы, конечно, можете изменить настройки действия ForEachPage, чтобы он выполнялся последовательно.

...