Как обращаться с токеном отмены в azure служебной шине topi c приемника? - PullRequest
0 голосов
/ 07 января 2020

У меня есть сценарий, в котором я вызываю RegisterMessageHandler класса SubscriptionClient Azure Библиотека служебной шины. В основном я использую подход, основанный на триггере, при получении сообщений от Service Bus в одной из моих служб в среде Service Fabri c в качестве службы без сохранения состояния. Поэтому я не закрываю объект subscriptionClient немедленно, а оставляю его открытым в течение всего срока службы, чтобы он продолжал получать сообщения из azure тем служебной шины.

А когда службе необходимо закрыто (по некоторым причинам), я хочу обработать токен отмены, передаваемый в службу Service Fabri c.

Мой вопрос заключается в том, как я могу обработать токен отмены в методе RegisterMessageHandler, который получает вызывается при получении нового сообщения? Также я хочу обработать закрытие клиента подписки «Изящно», т.е. я хочу, чтобы, если сообщение уже обрабатывалось, то я хотел, чтобы это сообщение было обработано полностью, а затем я хочу закрыть соединение. Ниже приведен код, который я использую.

В настоящее время мы придерживаемся следующего подхода: 1. Блокировка процесса сообщения с использованием семафорной блокировки и снятие блокировки в блоке finally. 2. Вызов метода cancellationToken.Register для обработки маркера отмены всякий раз, когда выполняется отмена. Снятие блокировки в методе Register.

public class AzureServiceBusReceiver
{
  private SubscriptionClient subscriptionClient;
  private static Semaphore semaphoreLock;

public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
    semaphoreLock = new Semaphore(1, 1);
    subscriptionClient = new SubscriptionClient(
        settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}

public void Receive(
    CancellationToken cancellationToken)
{
    var options = new MessageHandlerOptions(e =>
    {

        return Task.CompletedTask;
    })
    {
        AutoComplete = false,

    };

    subscriptionClient.RegisterMessageHandler(
        async (message, token) =>
        {
            semaphoreLock.WaitOne();
            if (subscriptionClient.IsClosedOrClosing)
                return;
            CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
            try
            {
                // message processing logic
            }
            catch (Exception ex)
            {
                await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
            }
            finally
            {
                semaphoreLock.Release();
            }
        }, options);


    cancellationToken.Register(() =>
    {
        semaphoreLock.WaitOne();
        if (!subscriptionClient.IsClosedOrClosing)
            subscriptionClient.CloseAsync().GetAwaiter().GetResult();
        semaphoreLock.Release();
        return;
    });
} 
}

1 Ответ

0 голосов
/ 07 января 2020

Реализуйте клиент сообщений как ICommunicationListener, поэтому, когда служба закрыта, вы можете заблокировать вызов до завершения обработки сообщения. Не используйте статический семафор c, чтобы вы могли безопасно повторно использовать код в своих проектах.

Здесь является примером того, как вы можете это сделать.

И вот пакет Nuget, созданный этим кодом.

И вы можете внести свой вклад!

...