Я бы хотел написать модуль параллельного исполнения на основе Solace. И я использую запрос-ответ схему для этого.
У меня есть:
- Несколько потребителей сообщений, которые публикуют сообщения в одной очереди.
- Несколько производителей сообщений, которые читают очередь и создают ответные сообщения.
- Время выполнения сообщения составляет от 10 секунд до 10 минут.
- Тип доступа к очереди неисключительный (например, он выполняет циклический перебор между всеми потребителями).
- Каждый производитель и потребитель является асинхронным, например, Solace API блокирует выполнение только во время соединения.
То, что я хотел бы иметь: , если программа работает с сообщением, другие сообщения не должны поступать. Это чрезвычайно важно, поскольку некоторые задачи блокируют выполнение на несколько минут, однако другие исполнители могут быть свободным через пару секунд.
Схема ниже может быть работоспособной (возможной), однако код блокировки появляется ниже. Я бы хотел этого избежать.
while(true)
{
var inputMessage = flow.ReceiveMsg( /*timeout 1s*/1_000); // <--- blocking code, I'd like to avoid it
flow.Ack(inputMessage.ADMessageId);
var reply = await ProcessMessageAsync(inputMessage); // execute plus handle exceptions
session.SendReply(inputMessage, reply)
}