Блокировка сообщений служебной шины Azure не обновляется? - PullRequest
0 голосов
/ 29 января 2019

Я создал службу для поддержки нескольких подписок в очереди в служебной шине Azure, но у меня странное поведение.

Мой одноэлементный класс подписки имеет метод, который выглядит следующим образом:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

Идея заключается в том, что вы подписываетесь на служебную шину Azure для определенного типа сообщений, и это напрямую соответствует очереди.В вашей подписке вы передаете делегату способ обработки сообщения.

Это похоже на работу ... с одним предупреждением.

Независимо от того, что я установил ttl дляMaxAutoRenewDuration, или OperationTimeout, в длительном процессе для любого данного сообщения, через минуту сообщение разблокируется из очереди, и другой подписчик забирает его и начинает его обрабатывать.

Мое пониманиеэто то, что именно то, что MaxAutoRenewDuration должно предотвратить ... но, похоже, ничего не предотвращает.

Может кто-нибудь сказать мне, что мне нужно сделать по-другому, чтобы убедиться, что потребитель владеет сообщениемдо завершения?

Ответы [ 2 ]

0 голосов
/ 04 февраля 2019

Оказывается, что удаленный процесс, на котором работал потребитель, молча терпел неудачу и не возвращал код состояния ошибки (или что-либо еще);механизм автоматического обновления завис в ожидании результата, поэтому время ожидания сообщения истекло.

Я не знаю, как это предотвратить, но как только я исправил проблему в удаленном процессе, проблема былабольше не воспроизводится.

Мораль истории: если все выглядит правильно и время ожидания истекло, кажется, что механизм автообновления разделяет некоторые ресурсы с асинхронными операциями, которые вы ожидаете.Это может быть другое место для поиска сбоев.

0 голосов
/ 30 января 2019

Есть несколько вариантов, на которые я могу обратить ваше внимание.

  1. Вместо использования по умолчанию ReceiveMode = PeekLock в SubscriptionClient, установите его на ReceiveAndDeleteсообщение получено, оно будет удалено из очереди и не будет использовано другими клиентами, это означает, что вы должны корректно обработать исключение и повторить попытку самостоятельно;

  2. Взгляните на OperationTimeout, который, согласно документу, предназначен для Duration after which individual operations will timeout

...