Я создал службу для поддержки нескольких подписок в очереди в служебной шине Azure, но у меня странное поведение.
Мой одноэлементный класс подписки имеет метод, который выглядит следующим образом:
public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
{
try
{
var messageLifespan = TimeSpan.FromSeconds(ttl);
var messageType = typeof(TMessage);
if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
{
subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
_activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
}
var messageHandlerOptions = new MessageHandlerOptions(OnException)
{
MaxConcurrentCalls = maxDop,
AutoComplete = false,
MaxAutoRenewDuration = messageLifespan,
};
subscriptionClient.RegisterMessageHandler(
async (azureMessage, cancellationToken) =>
{
try
{
var textPayload = _encoding.GetString(azureMessage.Body);
var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
if (message == null)
throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
await execution.Invoke(message);
await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
}
}
, messageHandlerOptions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Subscribe(Action<TMessage>)");
throw;
}
}
Идея заключается в том, что вы подписываетесь на служебную шину Azure для определенного типа сообщений, и это напрямую соответствует очереди.В вашей подписке вы передаете делегату способ обработки сообщения.
Это похоже на работу ... с одним предупреждением.
Независимо от того, что я установил ttl
дляMaxAutoRenewDuration
, или OperationTimeout
, в длительном процессе для любого данного сообщения, через минуту сообщение разблокируется из очереди, и другой подписчик забирает его и начинает его обрабатывать.
Мое пониманиеэто то, что именно то, что MaxAutoRenewDuration
должно предотвратить ... но, похоже, ничего не предотвращает.
Может кто-нибудь сказать мне, что мне нужно сделать по-другому, чтобы убедиться, что потребитель владеет сообщениемдо завершения?