Использование ADF REST-разъема для чтения и преобразования данных FHIR - PullRequest
1 голос
/ 25 апреля 2019

Я пытаюсь использовать фабрику данных Azure для чтения данных с сервера FHIR и преобразования результатов в файлы JSON (ndjson) с разделителями новой строки в хранилище BLOB-объектов Azure. В частности, если вы запрашиваете сервер FHIR, вы можете получить что-то вроде:

{
    "resourceType": "Bundle",
    "id": "som-id",
    "type": "searchset",
    "link": [
        {
            "relation": "next",
            "url": "https://fhirserver/?ct=token"
        },
        {
            "relation": "self",
            "url": "https://fhirserver/"
        }
    ],
    "entry": [
        {
            "fullUrl": "https://fhirserver/Organization/1234",
            "resource": {
                "resourceType": "Organization",
                "id": "1234",
                // More fields
        },
        {
            "fullUrl": "https://fhirserver/Organization/456",
            "resource": {
                "resourceType": "Organization",
                "id": "456",
                // More fields
        },

        // More resources
    ]
}

В основном набор ресурсов. Я хотел бы преобразовать это в файл с разделителями новой строки (он же ndjson), где каждая строка - это просто json для ресурса:

{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources

Я могу настроить соединитель REST и он может запросить сервер FHIR (включая нумерацию страниц), но, что бы я ни пытался, я не могу генерировать желаемый вывод. Я установил набор данных хранилища BLOB-объектов Azure:

{
    "name": "AzureBlob1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "JsonFormat",
                "filePattern": "setOfObjects"
            },
            "fileName": "myout.json",
            "folderPath": "outfhirfromadf"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

И настроить операцию копирования:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "resource": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Но в конце (несмотря на настройку сопоставления схемы) конечным результатом в BLOB-объекте всегда является только исходный пакет, возвращаемый с сервера. Если я сконфигурирую выходной BLOB-объект как текст с разделителями-запятыми, я смогу извлечь поля и создать плоское табличное представление, но это не совсем то, что я хочу.

Любые предложения будут высоко оценены.

Ответы [ 2 ]

0 голосов
/ 26 апреля 2019

Так что я вроде нашел решение.Если я сделаю первоначальный шаг преобразования, когда пакеты просто выгружаются в файл JSON, а затем выполню еще одно преобразование из файла JSON в то, что я притворяюсь текстовым файлом, в другой большой двоичный объект, я могу создать файл njson.

По сути, определите другой набор данных BLOB-объектов:

{
    "name": "AzureBlob2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "structure": [
            {
                "name": "Prop_0",
                "type": "String"
            }
        ],
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ",",
                "rowDelimiter": "",
                "quoteChar": "",
                "nullValue": "\\N",
                "encodingName": null,
                "treatEmptyAsNull": true,
                "skipLineCount": 0,
                "firstRowAsHeader": false
            },
            "fileName": "myout.json",
            "folderPath": "adfjsonout2"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

Обратите внимание, что этот TextFormat, а также отметьте, что quoteChar пусто.Если я затем добавлю еще одну операцию копирования:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "['resource']": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "Copy Data2",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "Copy Data1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "resource": "Prop_0"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob2",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

, тогда все получится.Это не идеально, потому что у меня теперь есть две копии данных в BLOB-объектах, но, я полагаю, одну можно легко удалить.

Я все равно хотел бы услышать об этом, если у кого-то есть одношаговое решение.

0 голосов
/ 26 апреля 2019

Как кратко обсуждалось в комментарии, Copy Activity не предоставляет много функциональных возможностей помимо данных отображения.Как указано в документации, операция Копирование выполняет следующие операции:

  1. Считывает данные из исходного хранилища данных.
  2. Выполняет сериализацию / десериализацию, сжатие / декомпрессия, сопоставление столбцов и т. д. Эти операции выполняются на основе конфигураций входного набора данных, выходного набора данных и операции копирования.
  3. Записывает данные в хранилище данных приемника / назначения.

Не похоже, что Copy Activity делает что-то еще, кроме эффективного копирования материала вокруг.

Я обнаружил, что работал - использовать Databrick.

Вот шаги:

  1. Добавление учетной записи Databricks к вашей подписке;
  2. Перейдите на страницу Databricks, нажав кнопку авторинга;
  3. Создать блокнот;
  4. написать скрипт (Scala, Python или .Net был недавно анонсирован ).

Скрипт будет следующим:

  1. Считать данные из хранилища BLOB-объектов;
  2. Отфильтровать и преобразовать данные по мере необходимости;
  3. Записать данные обратно в хранилище BLOB-объектов;

Вы можете проверить свой сценарий оттуда и, когда все будет готово, вы можете вернуться к своему конвейеру и создать Блокнотдействие , которое будет указывать на вашу записную книжку, содержащую сценарий.

Я боролся за кодирование в Scala, но оно того стоило:)

...