Когда я получаю отключение от сети, QueueClient не будет повторно подключаться к служебной шине Azure при восстановлении сетевого подключения. С этого момента все сообщения просто остаются на сервере.
Я попытался изменить RetryPolicy без эффекта. Единственный способ восстановить соединение - это воссоздать новый клиент QueueClient. Код клиента основан на примерах Microsoft.
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Utils;
using WeakEvent;
using Message = Microsoft.Azure.ServiceBus.Message;
namespace Messaging
{
public class QueueClient
{
private readonly Microsoft.Azure.ServiceBus.QueueClient Client;
private readonly WeakEventSource<string> _OnReceived = new WeakEventSource<string>();
public event EventHandler<string> OnReceived
{
add => _OnReceived.Subscribe( value );
remove => _OnReceived.Unsubscribe( value );
}
public QueueClient( AzureClient client )
{
var Keys = client.RequestMessagingConnect( new MessagingRequest
{
ConnectionType = MessagingRequest.MESSAGING_CONNECTION_TYPE.DRIVER
} ).Result;
if( Keys.Connected )
{
var Pk = Encryption.Decrypt( Keys.Pk );
var Id = Encryption.Decrypt( Keys.Id );
Client = new Microsoft.Azure.ServiceBus.QueueClient( Pk, Id, retryPolicy: new RetryExponential( new TimeSpan( 0, 0, 10 ), new TimeSpan( 0, 0, 30 ), int.MaxValue ) );
// Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
var MessageHandlerOptions = new MessageHandlerOptions( ExceptionHandler.ExceptionReceivedHandler )
{
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
// Indicates whether the message pump should automatically complete the messages after returning from user callback.
// False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
AutoComplete = false
};
// Register the function that processes messages.
Client.RegisterMessageHandler( ProcessMessagesAsync, MessageHandlerOptions );
}
}
private async Task ProcessMessagesAsync( Message message, CancellationToken token )
{
// Process the message.
var Body = Encoding.UTF8.GetString( message.Body );
var Message = Protocol.Data.Message.FromJson( Body );
_OnReceived?.Raise( this, Message ) );
// Complete the message so that it is not received again.
// This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
await Client.CompleteAsync( message.SystemProperties.LockToken );
// Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
// If queueClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
// to avoid unnecessary exceptions.
}
}
}