Как избежать многократного получения сообщений из очереди ServcieBus при использовании SDK WebJobs - PullRequest
0 голосов
/ 07 мая 2018

У меня есть WebJob со следующим обработчиком ServiceBus, использующим WebJobs SDK:

[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }
    }
}

Если предварительные условия для обработчика не выполнены, outputMessages содержит BrokeredMessage с теми же MessageId, Label и полезной нагрузкой, что и тот, который мы сейчас обрабатываем, но он содержит ScheduledEnqueueTimeUtc в будущее.

Идея состоит в том, что мы быстро завершаем обработку текущего сообщения и ожидаем повторной попытки, планируя новое сообщение в будущем.

Иногда, особенно когда в очереди больше сообщений, чем в режиме блокировки просмотра SDK, я вижу сообщения, дублирующиеся в очереди ServiceBus. Они имеют одинаковые MessageId, Label и полезную нагрузку, но разные SequenceNumber, EnqueuedTimeUtc и ScheduledEnqueueTimeUtc. Все они имеют количество доставки 1.

Глядя на мой код обработчика, это может произойти только в том случае, если я получил одно и то же сообщение несколько раз, выяснил, что мне нужно подождать, и создаю новое сообщение для обработки в будущем. Обработчик успешно завершает работу, поэтому исходное сообщение завершается.

Исходные сообщения являются уникальными. Также я поместил SingletonAttribute в обработчик сообщений, чтобы сообщения для одного и того же MessageId не могли использоваться разными обработчиками.

Почему несколько обработчиков запускаются с одним и тем же сообщением и как я могу предотвратить это?

Я использую Microsoft.Azure.WebJobs версию v2.1.0

Продолжительность моих обработчиков не более 17 с и в среднем 1 с. Продолжительность блокировки 1м. Тем не менее, моя лучшая теория заключается в том, что что-то с блокировкой сообщений (пере) не работает, поэтому, пока я обрабатываю обработчик, блокировка теряется, сообщение возвращается в очередь и используется в другой раз. Если оба обработчика увидят, что критический ресурс все еще занят, они оба поставят новое сообщение в очередь.

1 Ответ

0 голосов
/ 14 мая 2018

После небольшого количества экспериментов я выяснил причину и нашел обходной путь.

Если сообщение ServiceBus завершено, но временная блокировка не отменена, оно вернется в очередь в активном состоянии после истечения срока блокировки.

ServiceBus QueueClient, по-видимому, отменяет блокировку, как только получает следующее сообщение (или пакет сообщений).

Таким образом, если QueueClient, используемый SDK WebJobs, неожиданно завершает работу (например, из-за завершающегося процесса или перезапуска веб-приложения), все заблокированные сообщения возвращаются в очередь, даже если они были завершены.

В моем обработчике я сейчас завершаю сообщение вручную и также отменяю блокировку следующим образом:

public static async Task ProcessQueueMessageAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
    using (var scope = Program.Container.BeginLifetimeScope())
    {
        var handler = scope.Resolve<MessageHandlers>();
        logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));

        // To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
        var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);

        foreach (var outputMessage in outputMessages)
        {
            queue.Add(outputMessage);
        }

        await message.CompleteAsync().ConfigureAwait(false);
        await message.AbandonAsync().ConfigureAwait(false);
    }
}

Таким образом, я не получаю сообщения обратно в очередь при перезагрузке.

...