Использование сообщений от Azure Сервисная шина в нескольких процессах - PullRequest
0 голосов
/ 23 января 2020

Я разрабатываю систему на основе Azure Service Bus для быстрого запуска и забывания через API и асинхронной c обработки большого количества сообщений фоновыми службами через topi c. В контексте этого вопроса у topi c есть одна подписка, почему это могла быть очередь. По другим причинам я хотел бы сохранить это как топи c.

. Я недавно перенес код из приложения. NET framework, используя пакет WindowsAzure.ServiceBus, в основной пакет. NET, используя Microsoft.Azure.ServiceBus пакет. Чтобы обработать большое количество сообщений, я использую класс MessageReceiver следующим образом:

var connString = "...";
var subscriptionPath = EntityNameHelper.FormatSubscriptionPath("topic", "subscription");
var messageReceiver = new MessageReceiver(connString, subscriptionPath);
while (...)
{
    var messages = await messageReceiver.ReceiveAsync(10, TimeSpan.FromSeconds(5));
    ...
}

Для простоты я скрыл ряд деталей. Как тот факт, что мое приложение запускает 5 потоков и обрабатывает сообщения в каждом потоке, используя один и тот же экземпляр messageReceiver.

Обычно у меня запущено более одного экземпляра этого приложения, которое распространяется как по потокам, так и по процессам. И я считаю, что мы наконец-то добрались до кода моего вопроса. После перехода на. NET Core и новый пакет NuGet я заметил, что только одно приложение обрабатывает сообщения одновременно. Когда я открываю две консоли windows и запускаю процесс в каждом окне, я вижу, что приложение в окне 1 начинает обработку. Приложение в windows 2 ничего не обрабатывает. Через несколько секунд приложение в windows 1 прекращает обработку, а приложение в окне 2 начинает обработку. Через некоторое время он переключается обратно. В коммутаторе нет реального шаблона, но все мои сообщения успешно обрабатываются.

Существует ли какое-то ограничение в MessageReceiver, которое позволяет максимальному количеству потоков обрабатывать сообщения из одной и той же подписки или что-то вроде что?

1 Ответ

1 голос
/ 23 января 2020

Я не знаю каких-либо ограничений с MessageReceiver, когда дело доходит до количества потоков. Новая библиотека была оптимизирована, чтобы использовать преимущества параллелизма без использования потоков (асинхронный код). Технически, вы могли бы работать с одним потоком и иметь несколько одновременных задач получения. Другой подход заключается в использовании обработчика сообщений, предоставляемого QueueClient и SubscriptionClient, который позволяет легко указывать параллелизм для обработки нескольких сообщений, но позволяет получать одно сообщение для одновременного обратного вызова (без пакетирования).

Посредник предоставляет как можно больше сообщений за один звонок первому конкурирующему потребителю. Если сообщений недостаточно, все сообщения будут переданы одному (или нескольким первым) потребителям. Там нет кругового и справедливого распределения. Это работает, как и ожидалось.

...