Периодически получая ошибку несанкционированного доступа на RenewToken с Azure Слушателем очереди служебной шины - PullRequest
0 голосов
/ 27 апреля 2020

Я периодически получаю Microsoft. Azure .ServiceBus.ServiceBusException (сообщение ниже с удалением конфиденциальной информации) в моем приемнике очереди. Ключ SAS имеет доступ к отправке / прослушиванию, и ошибка кажется несущественной, поскольку обработка продолжается в обычном режиме. Тем не менее, сообщение создает проблемы сигнал-шум в моих панелях (получая 10-70 ошибок в день). Любые идеи о том, почему это происходит? Слушатель работает в Azure службе приложений, но я не думаю, что это имеет значение. Я настроил мою логи повторных попыток c, чтобы использовать RetryExponential с откатом от 1 секунды до 1 минуты с 5 повторными попытками.

Запрос руководства от разработчиков SDK

пакетов

Net Core 3.1

Microsoft.Azure.ServiceBus, Version=4.1.3.0, Culture=neutral, PublicKeyToken=7e34167dcc6d6d8c

Сообщение об ошибке

Ссылка ' xxx; xxx: xxx: xxx: источник (адрес: xxx): xxx 'принудительно отсоединен. Код: RenewToken. Детали: Несанкционированный доступ. Требование «прослушать» необходимо для выполнения этой операции. Ресурс: 'sb: //xxx.servicebus.windows.net/xxx' .. Идентификатор отслеживания: xxx, SystemTracker: xxx, метка времени: 2020-04-27T09: 36: 04 Ссылка 'xxx; xxx: xxx: xxx : источник (адрес: xxx): xxx 'принудительно отсоединен. Код: RenewToken. Детали: Несанкционированный доступ. Требование «прослушать» необходимо для выполнения этой операции. Ресурс: 'sb: //xxx.servicebus.windows.net/xxx' .. Идентификатор отслеживания: xxx, SystemTracker: xxx, метка времени: 2020-04-27T09: 36: 04

Источник

internal delegate TClient ClientFactory<out TClient>(string connectionString, string entityPath,
    RetryPolicy retryPolicy);

internal delegate Task OnMessageCallback<in TMessage>(TMessage message,
    CancellationToken cancellationToken = default) where TMessage : ICorrelative;

internal sealed class ReceiverClientWrapper<TMessage> : IReceiverClientWrapper<TMessage>
    where TMessage : ICorrelative
{
    // ReSharper disable once StaticMemberInGenericType
    private static readonly Regex TransientConnectionErrorRegex =
        new Regex(
            @"(The link '([a-f0-9-]+);([0-9]*:)*source\(address:([a-z0-9_]+)\):([a-z0-9_]+)' is force detached. Code: RenewToken. Details: Unauthorized access. 'Listen' claim\(s\) are required to perform this operation. Resource: 'sb:\/\/([a-z0-9-_.\/]+)'.. TrackingId:([a-z0-9_]+), SystemTracker:([a-z0-9]+), Timestamp:([0-9]{4}(-[0-9]{2}){2}T([0-9]{2}:){2}[0-9]{2}) )+",
            RegexOptions.Compiled | RegexOptions.Multiline | RegexOptions.IgnoreCase);

    private readonly IReceiverClient _receiverClient;
    private readonly IMessageConverter<TMessage> _messageConverter;
    private readonly ILogger _logger;
    private readonly int _maximumConcurrency;

    public ReceiverClientWrapper(IReceiverClient receiverClient, IMessageConverter<TMessage> messageConverter,
        ILogger logger, int maximumConcurrency)
    {
        _receiverClient = receiverClient;
        _messageConverter = messageConverter;
        _logger = logger;
        _maximumConcurrency = maximumConcurrency;
    }

    public Task SubscribeAsync(OnMessageCallback<TMessage> onMessageCallback,
        OnFailureCallback onFailureCallback, CancellationToken cancellationToken = default)
    {
        var messageHandlerOptions = CreateMessageHandlerOptions(onFailureCallback, cancellationToken);

        async Task Handler(Message message, CancellationToken token)
        {
            var convertedMessage = _messageConverter.Convert(message);

            await onMessageCallback(convertedMessage, cancellationToken);
            await _receiverClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        _receiverClient.RegisterMessageHandler(Handler, messageHandlerOptions);

        return Task.CompletedTask;
    }

    private MessageHandlerOptions CreateMessageHandlerOptions(OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken)
    {
        async Task HandleExceptionAsync(ExceptionReceivedEventArgs arguments)
        {
            var exception = arguments.Exception;

            if (TransientConnectionErrorRegex.IsMatch(exception.Message))
            {
                _logger.LogWarning(exception, @"Transient connectivity error occurred");

                return;
            }

            await onFailureCallback(exception, cancellationToken);
        }

        return new MessageHandlerOptions(HandleExceptionAsync)
        {
            AutoComplete = false,
            MaxConcurrentCalls = _maximumConcurrency
        };
    }

    public async ValueTask DisposeAsync()
    {
        await _receiverClient.CloseAsync();
    }
}

internal sealed class SenderClientWrapper<TMessage> : ISenderClientWrapper<TMessage> where TMessage : ICorrelative
{
    private readonly ISenderClient _senderClient;
    private readonly IMessageConverter<TMessage> _messageConverter;

    public SenderClientWrapper(ISenderClient senderClient, IMessageConverter<TMessage> messageConverter)
    {
        _senderClient = senderClient;
        _messageConverter = messageConverter;
    }

    public Task SendAsync(TMessage message, CancellationToken cancellationToken = default)
    {
        var internalMessage = _messageConverter.Convert(message);

        return _senderClient.SendAsync(internalMessage);
    }

    public Task SendAsync(IEnumerable<TMessage> messages, CancellationToken cancellationToken = default)
    {
        var internalMessages = messages
            .Select(_messageConverter.Convert)
            .ToImmutableArray();

        return _senderClient.SendAsync(internalMessages);
    }

    public async ValueTask DisposeAsync()
    {
        await _senderClient.CloseAsync();
    }
}

internal abstract class AbstractClientWrapperFactory
{
    private const int MaximumRetryCount = 5;
    private static readonly TimeSpan MinimumRetryBackOff = TimeSpan.FromSeconds(1);
    private static readonly TimeSpan MaximumRetryBackOff = TimeSpan.FromMinutes(1);

    protected AbstractClientWrapperFactory(IOptions<MessageBusConfiguration> options)
    {
        Options = options;
    }

    protected IOptions<MessageBusConfiguration> Options { get; }

    protected static string GetEntityPath<TMessage>() where TMessage : class
    {
        var messageAttribute = typeof(TMessage).GetCustomAttribute<AbstractMessageAttribute>();

        if (messageAttribute == null)
        {
            throw new ArgumentException($@"Message requires {nameof(AbstractMessageAttribute)}");
        }

        return messageAttribute.EntityName;
    }

    protected TClient CreateClientEntity<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class
    {
        var entityPath = GetEntityPath<TMessage>();
        var retryPolicy = CreateRetryPolicy();

        return clientFactory(Options.Value.ConnectionString, entityPath, retryPolicy);
    }

    protected static IQueueClient QueueClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new QueueClient(connectionString, entityPath, retryPolicy: retryPolicy);
    }

    private static RetryPolicy CreateRetryPolicy()
    {
        return new RetryExponential(MinimumRetryBackOff, MaximumRetryBackOff, MaximumRetryCount);
    }
}

internal sealed class SenderClientWrapperFactory : AbstractClientWrapperFactory, ISenderClientWrapperFactory
{
    private readonly IMessageConverterFactory _messageConverterFactory;

    public SenderClientWrapperFactory(IMessageConverterFactory messageConverterFactory,
        IOptions<MessageBusConfiguration> options) : base(options)
    {
        _messageConverterFactory = messageConverterFactory;
    }

    public ISenderClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    {
        return CreateWrapper<TEvent, ITopicClient>(TopicClientFactory);
    }

    public ISenderClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    {
        return CreateWrapper<TRequest, IQueueClient>(QueueClientFactory);
    }

    private ISenderClientWrapper<TMessage> CreateWrapper<TMessage, TClient>(ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : ISenderClient
    {
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new SenderClientWrapper<TMessage>(clientEntity, messageConverter);
    }

    private static ITopicClient TopicClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new TopicClient(connectionString, entityPath, retryPolicy);
    }
}

internal sealed class ReceiverClientWrapperFactory : AbstractClientWrapperFactory, IReceiverClientWrapperFactory
{
    private readonly IMessageConverterFactory _messageConverterFactory;
    private readonly ILogger<ReceiverClientWrapperFactory> _logger;

    public ReceiverClientWrapperFactory(IOptions<MessageBusConfiguration> options,
        IMessageConverterFactory messageConverterFactory,
        ILogger<ReceiverClientWrapperFactory> logger) : base(options)
    {
        _messageConverterFactory = messageConverterFactory;
        _logger = logger;
    }

    public IReceiverClientWrapper<TEvent> CreateTopicClient<TEvent>() where TEvent : class, IEvent
    {
        return CreateReceiverClientWrapper<TEvent, ISubscriptionClient>(SubscriptionClientFactory);
    }

    public IReceiverClientWrapper<TRequest> CreateQueueClient<TRequest>() where TRequest : class, IRequest
    {
        return CreateReceiverClientWrapper<TRequest, IQueueClient>(QueueClientFactory);
    }

    private IReceiverClientWrapper<TMessage> CreateReceiverClientWrapper<TMessage, TClient>(
        ClientFactory<TClient> clientFactory)
        where TMessage : class, ICorrelative
        where TClient : IReceiverClient
    {
        var clientEntity = CreateClientEntity<TMessage, TClient>(clientFactory);
        var messageConverter = _messageConverterFactory.Create<TMessage>();

        return new ReceiverClientWrapper<TMessage>(clientEntity, messageConverter, _logger,
            Options.Value.MaximumConcurrency);
    }

    private ISubscriptionClient SubscriptionClientFactory(string connectionString, string entityPath,
        RetryPolicy retryPolicy)
    {
        return new SubscriptionClient(connectionString, entityPath, Options.Value.SubscriberName,
            retryPolicy: retryPolicy);
    }
}

internal sealed class RequestService<TRequest> : IRequestService<TRequest> where TRequest : class, IRequest
{
    private readonly Lazy<ISenderClientWrapper<TRequest>> _senderClient;
    private readonly Lazy<IReceiverClientWrapper<TRequest>> _receiverClient;

    public RequestService(ISenderClientWrapperFactory senderClientWrapperFactory,
        IReceiverClientWrapperFactory receiverClientWrapperFactory)
    {
        _senderClient =
            new Lazy<ISenderClientWrapper<TRequest>>(senderClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);

        _receiverClient
            = new Lazy<IReceiverClientWrapper<TRequest>>(receiverClientWrapperFactory.CreateQueueClient<TRequest>,
                LazyThreadSafetyMode.PublicationOnly);
    }

    public Task PublishRequestAsync(TRequest requestMessage, CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(requestMessage, cancellationToken);
    }

    public Task PublishRequestAsync(IEnumerable<TRequest> requestMessages,
        CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(requestMessages, cancellationToken);
    }

    public Task SubscribeAsync(OnRequestCallback<TRequest> onRequestCallback, OnFailureCallback onFailureCallback,
        CancellationToken cancellationToken = default)
    {
        return _receiverClient
            .Value
            .SubscribeAsync((message, token) => onRequestCallback(message, cancellationToken), onFailureCallback,
                cancellationToken);
    }

    public async ValueTask DisposeAsync()
    {
        if (_senderClient.IsValueCreated)
        {
            await _senderClient.Value.DisposeAsync();
        }

        if (_receiverClient.IsValueCreated)
        {
            await _receiverClient.Value.DisposeAsync();
        }
    }

    public Task ThrowIfNotReadyAsync(CancellationToken cancellationToken = default)
    {
        return _senderClient.Value.SendAsync(ImmutableArray<TRequest>.Empty, cancellationToken);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...