Потоковая аналитика - Как обрабатывать json в ссылочном вводе - PullRequest
1 голос
/ 13 февраля 2020

У меня есть задание Azure Stream Analytics (ASA), которое обрабатывает данные телеметрии устройства из концентратора событий. Поток должен быть объединен со справочными данными из таблицы sql, чтобы дополнить каждое сообщение дополнительными метаданными устройства. Объединенная запись должна храниться в CosmosDb.

База данных sql для обслуживания метаданных устройства:

CREATE TABLE [dbo].[MyTable]
(
  [DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
  [MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

В ASA я настроил ввод справочных данных с помощью простого запроса:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

И У меня есть основной запрос ASA, который выполняет объединение:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)

SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

Все это работает, и объединенные данные сохраняются в CosmosDb. Однако значение столбца метаданных из sql обрабатывается как строка и сохраняется в comos с кавычками и escape-символами. Пример:

{ "DeviceId" : "abc1234", … , "MetaData" : "{ \"TestKey\": \"test value\" }" };

Есть ли способ обработки и хранения json из метаданных как правильного Json объекта, то есть

{ "DeviceId" : "abc1234", … , "MetaData" : { "TestKey": "test value" } };

Ответы [ 2 ]

1 голос
/ 15 февраля 2020

Я нашел способ добиться этого в ASA - вам нужно создать javascript пользовательскую функцию :

function parseJson(strjson){
          return JSON.parse(strjson);
}

И вызвать ее в своем запросе:

...
SELECT TD.*, udf.parseJson(MD.MetaData)
...
1 голос
/ 14 февраля 2020

Как вы упомянули в своем вопросе, справочные данные json обрабатываются как json строка, а не json объект. Основываясь на моих исследованиях Синтаксиса запросов в ASA, нет встроенной функции для преобразования этого.

Однако я бы предложил вам использовать Azure Функция Космос DB Trigger для обработки каждого созданного документа. Пожалуйста, обратитесь к моему функциональному коду:

using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json.Linq;

namespace ProcessJson
{
    public class Class1
    {
        [FunctionName("DocumentUpdates")]
        public static void Run(
        [CosmosDBTrigger(databaseName:"db",collectionName: "item", ConnectionStringSetting = "CosmosDBConnection",LeaseCollectionName = "leases",
            CreateLeaseCollectionIfNotExists = true)]
        IReadOnlyList<Document> documents,
        TraceWriter log)
        {
            log.Verbose("Start.........");
            String endpointUrl = "https://***.documents.azure.com:443/";
            String authorizationKey = "***";
            String databaseId = "db";
            String collectionId = "import";

            DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey);

            for (int i = 0; i < documents.Count; i++)
            {
                Document doc = documents[i];
                if((doc.alreadyFormat == Undefined.Value) ||(!doc.alreadyFormat)){
                   String MetaData = doc.GetPropertyValue<String>("MetaData");
                   JObject o = JObject.Parse(MetaData);

                   doc.SetPropertyValue("MetaData", o);
                   doc.SetPropertyValue("alreadyFormat", true);
                   client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, doc.Id), doc); 

                   log.Verbose("Update document Id " + doc.Id);

                }

            }
        }
    }
}

Кроме того, пожалуйста, обратитесь к делу: Azure Cosmos DB SQL - как удалить внутреннее json свойство

...