У меня есть сценарий, в котором я вызываю RegisterMessageHandler класса SubscriptionClient Azure Библиотека служебной шины. В основном я использую подход, основанный на триггере, при получении сообщений от Service Bus в одной из моих служб в среде Service Fabri c в качестве службы без сохранения состояния. Поэтому я не закрываю объект subscriptionClient немедленно, а оставляю его открытым в течение всего срока службы, чтобы он продолжал получать сообщения из azure тем служебной шины.
А когда службе необходимо закрыто (по некоторым причинам), я хочу обработать токен отмены, передаваемый в службу Service Fabri c.
Мой вопрос заключается в том, как я могу обработать токен отмены в методе RegisterMessageHandler, который получает вызывается при получении нового сообщения? Также я хочу обработать закрытие клиента подписки «Изящно», т.е. я хочу, чтобы, если сообщение уже обрабатывалось, то я хотел, чтобы это сообщение было обработано полностью, а затем я хочу закрыть соединение. Ниже приведен код, который я использую.
В настоящее время мы придерживаемся следующего подхода: 1. Блокировка процесса сообщения с использованием семафорной блокировки и снятие блокировки в блоке finally. 2. Вызов метода cancellationToken.Register для обработки маркера отмены всякий раз, когда выполняется отмена. Снятие блокировки в методе Register.
public class AzureServiceBusReceiver
{
private SubscriptionClient subscriptionClient;
private static Semaphore semaphoreLock;
public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
semaphoreLock = new Semaphore(1, 1);
subscriptionClient = new SubscriptionClient(
settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}
public void Receive(
CancellationToken cancellationToken)
{
var options = new MessageHandlerOptions(e =>
{
return Task.CompletedTask;
})
{
AutoComplete = false,
};
subscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
semaphoreLock.WaitOne();
if (subscriptionClient.IsClosedOrClosing)
return;
CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
try
{
// message processing logic
}
catch (Exception ex)
{
await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
}
finally
{
semaphoreLock.Release();
}
}, options);
cancellationToken.Register(() =>
{
semaphoreLock.WaitOne();
if (!subscriptionClient.IsClosedOrClosing)
subscriptionClient.CloseAsync().GetAwaiter().GetResult();
semaphoreLock.Release();
return;
});
}
}