EventGridTrigger для Azure Service Bus Тема - PullRequest
0 голосов
/ 04 марта 2019

Я создал функцию Azure на основе триггера EventGrid.Этот триггер срабатывает всякий раз, когда новое сообщение поступает в тему служебной шины.Ниже приведен шаблон функции, сгенерированный

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

public static void Run(JObject eventGridEvent, TraceWriter log)
{
    log.Info(eventGridEvent.ToString(Formatting.Indented));
}

Мое требование от функции Azure - обработать данные и сохранить их в ADLS.Теперь, как мне разобрать / десериализовать данные из типа JObject.Мне нужно нормализовать данные в этой функции, прежде чем сохранять их в хранилище озера данных.Нужно ли перезаписывать функцию?

Пожалуйста, предоставьте некоторые детали / ссылку для удовлетворения этого требования

Ответы [ 2 ]

0 голосов
/ 05 марта 2019

В дополнение к ответу Шона интеграция Azure Service Bus с AEG позволяет создать некоторую функцию контроля для объектов ASB.Обратите внимание, что эта интеграция не похожа на ту, что делается для учетной записи большого двоичного объекта, где события публикуются каждый раз, когда большой двоичный объект создается / удаляется.Другими словами, ASB не будет публиковать событие для каждого сообщения, поступившего в объект ASB, события публикуются как сторожевой таймер объекта.

Этот вид сторожевого устройства сущности использует следующую логику:

  1. Событие не публикуется, если в сущности нет сообщения.
  2. Событие публикуется сразу, когда в сущность поступает первое сообщение, и в течение 360+ секунд для сущности нет активного прослушивателя.
  3. Событие публикуется каждые 120 секунд, когда прослушиватель все еще неактивен, и в сущности есть хотя бы одно сообщение
  4. Событие публикуется после 360 секунд простоя прослушивателя (неактивновремя и еще есть хотя бы одно сообщение в сущности.Например, если у нас есть 5 сообщений в объекте, и подписчик отправит только одно сообщение, используя REST Api, следующее событие будет опубликовано через 360 секунд.Другими словами, сторожевая сущность позволяет держать слушателя в простое в течение 360 секунд.

Исходя из описанного выше поведения «сторожевого объекта», эта функция выглядит более подходящей для медленного обмена сообщениями трафика, например, прослушивания при пробуждении и отслеживании прослушивания объектов ASB.

Обратите внимание, что 360-секундного простоя для слушателя можно избежать, используя политику короткого времени повторения на уровне подписки, поэтому абонент может быть вызван снова 3 раза в течение 5-минутного времени повторения.

В целях тестирования ниже приведен фрагмент кода функции EventGridTrigger для подписчика на события ASB.

#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"

using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;



// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));

public static async Task Run(JObject eventGridEvent, ILogger log)
{
    log.LogInformation(eventGridEvent.ToString());

    // from the eventgrid payload
    var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";

    using (var client = new HttpClient())
    {

        client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());

        do
        {
            // read & delete the message 
            var response = await client.DeleteAsync(requestUri);

            // check for message
            if (response.StatusCode != HttpStatusCode.OK)
            {
                log.LogWarning($">>> No message <<<");
                break;
            }

            // message body
            string jsontext = await response.Content.ReadAsStringAsync();

            // show the message
            log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
        } while (true);

    }

    await Task.CompletedTask;
}




// helpers
class SasTokenHelper
{
    DateTime expiringSaS;
    uint sasTTLInMinutes = 10;
    string sasToken = string.Empty;
    (string hostname, string keyname, string key) config;

    public SasTokenHelper(string connectionString)
    {
        config = GetPartsFromConnectionString(connectionString);
        GetSasToken();
    }

    public string GetSasToken()
    {
        lock (sasToken)
        {
            if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
            {
                this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
                expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
            }
            return sasToken;
        }
    }

    internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
    {
        var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
        return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
    }

    internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
    {
        var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
        return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
    }
}
0 голосов
/ 04 марта 2019

Service Bus (Premium) отправляет события для двух сценариев:

  1. ActiveMessagesWithNoListenersAvailable
  2. DeadletterMessagesAvailable

Первое событие будет отправленокогда есть сообщения, связанные с определенной сущностью, и не существует активных слушателей.Сущность будет указана в полезной нагрузке вместе с другой необходимой информацией для доступа к ней (например, пространством имен или темой подписки для получения).Схема определена в документации .

Схема второго события аналогична первой и генерируется для буквенных очередей с неразборчивыми буквами.

Теперь, как мне разобрать / десериализовать данные из типа JObject.Мне нужно нормализовать данные в этой функции, прежде чем сохранять их в хранилище озера данных.Нужно ли перезаписывать функцию?.

eventGridEvent Сам JSON не собирается давать вам сообщение (я) служебной шины Azure.Вам нужно будет узнать, как сначала были сериализованы исходные сообщения, т. Е. Что использовала сторона отправителя.Эта десериализация должна идти в функцию, а затем код для записи объекта озера данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...