Обработка предварительно выбранных сообщений в ServiceBus QueueClient при корректном завершении работы - PullRequest
0 голосов
/ 24 января 2019

Мне нужно создать высокопроизводительный клиент очереди ServiceBus, который может обрабатывать тысячи сообщений в секунду.Единственный способ, которым мне удалось это сделать, - это комбинация ReceiveMode.ReceiveAndDelete и PrefetchCount.

var client = new QueueClient(connectionString, queueName, ReceiveMode.ReceiveAndDelete);
client.PrefetchCount = 1000;
client.RegisterMessageHandler(ProcessMessagesAsync, new MessageHandlerOptions());
await Task.Delay(10000);   // wait for 10s while some messages are processed
await client.CloseAsync();

Надежность не критична, поэтому я могу позволить себе потерять сообщения в случае сбоя моего приложения.Тем не менее, я бы хотел избежать потери сообщений во время изящных отключений, таких как вызов CloseAsync выше.Я особенно хотел бы избежать потери сотен предварительно выбранных сообщений, которые находятся в локальном кэше.Есть ли способ заставить клиента ждать обработки предварительно выбранных сообщений, но прекратить получать новые?

1 Ответ

0 голосов
/ 28 января 2019

Мне не удалось найти подходящий обходной путь для этого; Похоже, это является неотъемлемым ограничением в дизайне QueueClient. Однако я обнаружил, что MessageReceiver предлагает такую ​​же функциональность , плюс пакетное завершение сообщений, что обеспечивает надежное высокопроизводительное потребление сообщений (около 500 мсг / с).

var messageReceiver = new MessageReceiver(
    connectionString, 
    entityPath, 
    ReceiveMode.PeekLock, 
    prefetchCount: 1000);

var processedLockTokens = new List<string>();

while (true)
{
    var messages = await messageReceiver.ReceiveAsync(maxMessageCount: 1000, operationTimeout: TimeSpan.FromSeconds(5));
    if (messages == null)
        continue;

    try
    {
        foreach (var message in messages)
        {
            // Process message.

            processedLockTokens.Add(message.SystemProperties.LockToken);

            if (isCancellationRequested)
                return;
        }
    }
    finally
    {
        if (processedLockTokens.Any())
        {
            await messageReceiver.CompleteAsync(processedLockTokens);

            processedLockTokens.Clear();
        }
    }
}
...