У меня есть WebJob со следующим обработчиком ServiceBus, использующим WebJobs SDK:
[Singleton("{MessageId}")]
public static async Task HandleMessagesAsync([ServiceBusTrigger("%QueueName%")] BrokeredMessage message, [ServiceBus("%QueueName%")]ICollector<BrokeredMessage> queue, TextWriter logger)
{
using (var scope = Program.Container.BeginLifetimeScope())
{
var handler = scope.Resolve<MessageHandlers>();
logger.WriteLine(AsInvariant($"Handling message with label {message.Label}"));
// To avoid coupling Microsoft.Azure.WebJobs the return type is IEnumerable<T>
var outputMessages = await handler.OnMessageAsync(message).ConfigureAwait(false);
foreach (var outputMessage in outputMessages)
{
queue.Add(outputMessage);
}
}
}
Если предварительные условия для обработчика не выполнены, outputMessages
содержит BrokeredMessage
с теми же MessageId
, Label
и полезной нагрузкой, что и тот, который мы сейчас обрабатываем, но он содержит ScheduledEnqueueTimeUtc
в будущее.
Идея состоит в том, что мы быстро завершаем обработку текущего сообщения и ожидаем повторной попытки, планируя новое сообщение в будущем.
Иногда, особенно когда в очереди больше сообщений, чем в режиме блокировки просмотра SDK, я вижу сообщения, дублирующиеся в очереди ServiceBus. Они имеют одинаковые MessageId
, Label
и полезную нагрузку, но разные SequenceNumber
, EnqueuedTimeUtc
и ScheduledEnqueueTimeUtc
. Все они имеют количество доставки 1.
Глядя на мой код обработчика, это может произойти только в том случае, если я получил одно и то же сообщение несколько раз, выяснил, что мне нужно подождать, и создаю новое сообщение для обработки в будущем. Обработчик успешно завершает работу, поэтому исходное сообщение завершается.
Исходные сообщения являются уникальными. Также я поместил SingletonAttribute
в обработчик сообщений, чтобы сообщения для одного и того же MessageId
не могли использоваться разными обработчиками.
Почему несколько обработчиков запускаются с одним и тем же сообщением и как я могу предотвратить это?
Я использую Microsoft.Azure.WebJobs
версию v2.1.0
Продолжительность моих обработчиков не более 17 с и в среднем 1 с. Продолжительность блокировки 1м. Тем не менее, моя лучшая теория заключается в том, что что-то с блокировкой сообщений (пере) не работает, поэтому, пока я обрабатываю обработчик, блокировка теряется, сообщение возвращается в очередь и используется в другой раз. Если оба обработчика увидят, что критический ресурс все еще занят, они оба поставят новое сообщение в очередь.