Обработка токена отмены у нескольких клиентов очереди шины обслуживания - PullRequest
0 голосов
/ 15 марта 2019

У меня есть настраиваемое количество потребителей очереди серверной шины в одном процессе.Код использует ReceiveAsync метод QueueClient класса и вызывает QueueClient.Close при отмене.

Работает довольно хорошо, но оказалось, чтоСуществует некоторая проблема с закрытием QueueClient - только один клиент завершает работу немедленно, все остальные зависают до истечения времени ожидания serverWaitTime .

Посмотрите на код и его вывод:

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;

public class Program
{
    private static void Main()
    {
        CancellationTokenSource source = new CancellationTokenSource();
        var cancellationToken = source.Token;
        var logger = new Logger();

        Task.Run(() =>
        {
            Task.Delay(TimeSpan.FromSeconds(10)).Wait();
            source.Cancel();
            logger.Log("Cancellation requested.");
        });

        string connectionString = "...";
        string queueName = "...";

        var workers = Enumerable.Range(1, 3).Select(i => new Worker(connectionString, queueName, logger));
        var tasks = workers.Select(worker => Task.Run(() => worker.RunAsync(cancellationToken), cancellationToken)).ToArray();
        Task.WaitAll(tasks);
        logger.Log("The end.");
    }
}

class Worker
{
    private readonly Logger _logger;
    private readonly QueueClient _queueClient;

    public Worker(string connectionString, string queueName, Logger logger)
    {
        _logger = logger;
        _queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);
    }

    public async Task RunAsync(CancellationToken cancellationToken)
    {
        _logger.Log($"Worker {GetHashCode()} started.");
        using (cancellationToken.Register(() => _queueClient.Close()))
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var message = await _queueClient.ReceiveAsync(TimeSpan.FromSeconds(20));
                    _logger.Log($"Worker {GetHashCode()}: Process message {message.MessageId}...");
                }
                catch (OperationCanceledException ex)
                {
                    _logger.Log($"Worker {GetHashCode()}: {ex.Message}");
                }
            }
        _logger.Log($"Worker {GetHashCode()} finished.");
    }
}

class Logger
{
    private readonly Stopwatch _stopwatch;

    public Logger()
    {
        _stopwatch = new Stopwatch();
        _stopwatch.Start();
    }

    public void Log(string message) => Console.WriteLine($"{_stopwatch.Elapsed}: {message}");
}

Вывод:

00:00:00.8125644: Worker 12547953 started.
00:00:00.8127684: Worker 45653674 started.
00:00:00.8127314: Worker 59817589 started.
00:00:10.4534961: Cancellation requested.
00:00:11.4912900: Worker 45653674: The operation cannot be performed because the entity has been closed or aborted.
00:00:11.4914054: Worker 45653674 finished.
00:00:22.3242631: Worker 12547953: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3244501: Worker 12547953 finished.
00:00:22.3243945: Worker 59817589: The operation cannot be performed because the entity has been closed or aborted.
00:00:22.3252456: Worker 59817589 finished.
00:00:22.3253535: The end.

Итак, как вы можете видеть, рабочий 45653674 немедленно остановился, а два других остановились только через 10 секунд.

1 Ответ

0 голосов
/ 15 марта 2019

В этой статье я нашел некоторую полезную информацию: https://developers.de/blogs/damir_dobric/archive/2013/12/03/service-bus-undocumented-scaling-tips-amp-tricks.aspx. Проблема исчезает, если каждый клиент очереди работает через свое собственное физическое соединение.

Поэтому, чтобы устранить проблему, необходимо заменить следующеекод:

_queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName);

с

var factory = MessagingFactory.CreateFromConnectionString(connectionString);
_queueClient = factory.CreateQueueClient(queueName);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...