Ошибка RenewLock с использованием ServiceBus SubscriptionClient - PullRequest
0 голосов
/ 19 декабря 2018

Я пишу консольное приложение в .net Core 2.1, и я намерен прослушивать сообщения в теме в ServiceBus и обрабатывать новые сообщения, поступающие в Elasticsearch с использованием API NEST (NEST, вероятно, не имеет значения для моего вопроса, но хотелбыть прозрачным).

Моя сущность Topic в ServiceBus называется «test», и у меня есть подписка, также называемая «test» (полный путь будет «test / subscription / test»).

В моем консольном приложении .net Core у меня есть следующие ссылки на NuGet:

<PackageReference Include="Microsoft.Azure.ServiceBus" Version="3.2.1" />
<PackageReference Include="NEST" Version="6.4.1" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />

У меня очень странная проблема при использовании .net Standard ServiceBus Api, когда я регулярно получаю ошибку возобновления блокировки:

Обработчик сообщений обнаружил исключение Microsoft.Azure.ServiceBus.MessageLockLostException

Я переместил свой код обратно в очень воспроизводимый образец здесь:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Microsoft.Azure.ServiceBus;
using Nest;
using Newtonsoft.Json;

namespace SampleApp
{
    public class Program
    {

    private static SubscriptionClient _subscriptionClient;
    private static IElasticClient _elasticClient;

    private static string ServiceBusConnectionString = "[connectionString]";
    private static string TopicName = "test";
    private static string SubscriptionName = "test";

    public static void Main(string[] args)
    {
        var elasticsearchSettings = new ConnectionSettings(new SingleNodeConnectionPool(new Uri("http://does.not.exist:9200"))).DefaultIndex("DoesNotExistIndex");
        _elasticClient = new ElasticClient(elasticsearchSettings);

        _subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);

        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
            MaxAutoRenewDuration = TimeSpan.FromSeconds(400),
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };

        // Register the function that processes messages.
        _subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

        Console.WriteLine("INFO: Process message handler registered, listening for messages");
        Console.Read();
    }

    private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Message received.
        var content = Encoding.UTF8.GetString(message.Body);

        var messageBody = JsonConvert.DeserializeObject<string[]>(content);

        Console.WriteLine($"INFO: Message arrived: {message}");
        Console.WriteLine($"INFO: Message body: \"{string.Join(",", messageBody)}\"");
        try
        {
            var response = _elasticClient.Ping();

            if (!response.IsValid && response.OriginalException != null)
                Console.WriteLine($"ERROR: ElasticSearch could not be reached, error was \"{response.OriginalException.Message}\"");
            else
                Console.WriteLine("INFO: ElasticSearch was contacted successfully");
        }
        catch (Exception e)
        {
            Console.WriteLine("!ERROR!: " + e);
        }

        await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
        Console.WriteLine("INFO: Message completed");
    }

    // Use this handler to examine the exceptions received on the message pump.
    private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.Action}: " +
                          $"{exceptionReceivedEventArgs.ExceptionReceivedContext.EntityPath}");
        return Task.CompletedTask;
    }

}

Этот код почти идентичен примеру, взятому здесь: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-how-to-use-topics-subscriptions

Я намеренно "пингую" экземпляр ElasticsearchОн не существует, чтобы создать исключение сокета, которое помогает мне воспроизвести проблему.

Одна вещь, которую я заметил, это то, что когда я создаю новую тему и имею EnabledPartioning = false ,проблема не возникает.

Кто-нибудь видел это раньше?Кажется, проблема глубоко в самом коде ServiceBus.

Примечание. Я пытался использовать Receiver для чтения сообщений, используя «ReceiveAsync», и я также получаю эту ошибку в этом сценарии.Кроме того, мой драйвер для тестирования - это перемещение клиента .NET Framework ServiceBus (который работает с разделами) и на версию .net Core.

Заранее спасибо за любые указатели !!

Ответы [ 2 ]

0 голосов
/ 11 января 2019

В моем случае выше, проблема заключалась в небольшом неправильном понимании моей конфигурации.В Azure, если вы переходите к:

Группа ресурсов> ServiceBusInstance> Темы> testTopic> testSubscription

Вы можете найти свойства подписки.Здесь вы увидите продолжительность блокировки при отправке сообщения.По умолчанию это 60 секунд, но я продлил длительный процесс до 5 минут, как показано ниже:

enter image description here

Затем в коде при подключениичтобы настроить свойства для моего клиента подписки, мне нужно было убедиться, что свойство MaxAutoRenewDuration установлено правильно.

Я предполагал, что это свойство означает, что если вы определили 30 секунд для этого, что клиент подписки внутренне обновитсяблокировка каждые 30 секунд, таким образом, если ваш максимальный срок действия составляет, например, 5 минут, блокировка будет возобновлена, пока вы обрабатываете сообщение ...

Фактически, свойство на самом деле означает максимумвремя, в течение которого они блокируют обновление, произойдет для клиента подписки на внутреннем уровне.

Поэтому, если вы установите это значение равным 24 часам, например Timespan.FromHours(24), а ваша обработка займет 12 часов, он будет продлен.Однако, если вы установите это значение на 12 часов, используя Timespan.FromHours(12), и ваш код будет работать в течение 24, то, когда вы перейдете к завершению сообщения, это выдаст исключение lockLost (как я получал выше за более короткие интервалы!).

Одна вещь, которую я сделал, которую было легко реализовать, - это динамическое извлечение LockDuration из свойств подписки во время выполнения (все мои темы могут иметь разную конфигурацию) и применение MaxAutoRenewDuration соответствующим образом с помощью этого.

Пример кода:

sbNamespace.Topics.GetByName(“test”).Subscriptions.GetByName(“test”).LockDurationInSeconds

Примечание. Я использую пакет Azure.Management.Fluent для создания пространства sbNamespace.

Надеюсь, это поможет другим!

0 голосов
/ 19 декабря 2018

Я бы посоветовал вам установить более высокую продолжительность блокировки при подписке MaxAutoRenewDuration = TimeSpan.FromSeconds(xxxx), или вы можете просто использовать message.RenewLock().

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...