В дополнение к ответу Шона интеграция Azure Service Bus с AEG позволяет создать некоторую функцию контроля для объектов ASB.Обратите внимание, что эта интеграция не похожа на ту, что делается для учетной записи большого двоичного объекта, где события публикуются каждый раз, когда большой двоичный объект создается / удаляется.Другими словами, ASB не будет публиковать событие для каждого сообщения, поступившего в объект ASB, события публикуются как сторожевой таймер объекта.
Этот вид сторожевого устройства сущности использует следующую логику:
- Событие не публикуется, если в сущности нет сообщения.
- Событие публикуется сразу, когда в сущность поступает первое сообщение, и в течение 360+ секунд для сущности нет активного прослушивателя.
- Событие публикуется каждые 120 секунд, когда прослушиватель все еще неактивен, и в сущности есть хотя бы одно сообщение
- Событие публикуется после 360 секунд простоя прослушивателя (неактивновремя и еще есть хотя бы одно сообщение в сущности.Например, если у нас есть 5 сообщений в объекте, и подписчик отправит только одно сообщение, используя REST Api, следующее событие будет опубликовано через 360 секунд.Другими словами, сторожевая сущность позволяет держать слушателя в простое в течение 360 секунд.
Исходя из описанного выше поведения «сторожевого объекта», эта функция выглядит более подходящей для медленного обмена сообщениями трафика, например, прослушивания при пробуждении и отслеживании прослушивания объектов ASB.
Обратите внимание, что 360-секундного простоя для слушателя можно избежать, используя политику короткого времени повторения на уровне подписки, поэтому абонент может быть вызван снова 3 раза в течение 5-минутного времени повторения.
В целях тестирования ниже приведен фрагмент кода функции EventGridTrigger для подписчика на события ASB.
#r "..\\bin\\Microsoft.Azure.ServiceBus.dll"
#r "Newtonsoft.Json"
using System;
using System.Threading.Tasks;
using System.Text;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Web;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Azure.ServiceBus.Primitives;
// sasToken cache
static SasTokenHelper helper = new SasTokenHelper(Environment.GetEnvironmentVariable("AzureServiceBusConnectionString"));
public static async Task Run(JObject eventGridEvent, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
// from the eventgrid payload
var requestUri = $"{eventGridEvent["data"]?["requestUri"]?.Value<string>()}?api-version=2015-01";
using (var client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Authorization", helper.GetSasToken());
do
{
// read & delete the message
var response = await client.DeleteAsync(requestUri);
// check for message
if (response.StatusCode != HttpStatusCode.OK)
{
log.LogWarning($">>> No message <<<");
break;
}
// message body
string jsontext = await response.Content.ReadAsStringAsync();
// show the message
log.LogInformation($"\nHeaders:\n\t{string.Join("\n\t", response.Headers.Select(i => $"{i.Key}={i.Value.First()}"))}\nBody:\n\t{jsontext}");
} while (true);
}
await Task.CompletedTask;
}
// helpers
class SasTokenHelper
{
DateTime expiringSaS;
uint sasTTLInMinutes = 10;
string sasToken = string.Empty;
(string hostname, string keyname, string key) config;
public SasTokenHelper(string connectionString)
{
config = GetPartsFromConnectionString(connectionString);
GetSasToken();
}
public string GetSasToken()
{
lock (sasToken)
{
if (expiringSaS < DateTime.UtcNow.AddMinutes(-1))
{
this.sasToken = GetSASToken(config.hostname, config.key, config.keyname, sasTTLInMinutes);
expiringSaS = DateTime.UtcNow.AddMinutes(sasTTLInMinutes);
}
return sasToken;
}
}
internal (string hostname, string keyname, string key) GetPartsFromConnectionString(string connectionString)
{
var parts = connectionString.Split(new[] { ';' }, StringSplitOptions.RemoveEmptyEntries).Select(s => s.Split(new[] { '=' }, 2)).ToDictionary(x => x[0].Trim(), x => x[1].Trim());
return (parts["Endpoint"] ?? "", parts["SharedAccessKeyName"] ?? "", parts["SharedAccessKey"] ?? "");
}
internal string GetSASToken(string resourceUri, string key, string keyName = null, uint minutes = 10)
{
var tp = SharedAccessSignatureTokenProvider.CreateSharedAccessSignatureTokenProvider(keyName, key, TimeSpan.FromMinutes(minutes));
return tp.GetTokenAsync(resourceUri, TimeSpan.FromSeconds(60)).Result.TokenValue;
}
}