Как я могу опрашивать сообщения из очереди Solace (вместо поведения по умолчанию pushing)? - PullRequest
0 голосов
/ 10 января 2019

Я бы хотел написать модуль параллельного исполнения на основе Solace. И я использую запрос-ответ схему для этого.

У меня есть:

  • Несколько потребителей сообщений, которые публикуют сообщения в одной очереди.
  • Несколько производителей сообщений, которые читают очередь и создают ответные сообщения.
  • Время выполнения сообщения составляет от 10 секунд до 10 минут.
  • Тип доступа к очереди неисключительный (например, он выполняет циклический перебор между всеми потребителями).
  • Каждый производитель и потребитель является асинхронным, например, Solace API блокирует выполнение только во время соединения.

То, что я хотел бы иметь: , если программа работает с сообщением, другие сообщения не должны поступать. Это чрезвычайно важно, поскольку некоторые задачи блокируют выполнение на несколько минут, однако другие исполнители могут быть свободным через пару секунд.

Схема ниже может быть работоспособной (возможной), однако код блокировки появляется ниже. Я бы хотел этого избежать.

while(true)
{
    var inputMessage = flow.ReceiveMsg( /*timeout 1s*/1_000); // <--- blocking code, I'd like to avoid it

    flow.Ack(inputMessage.ADMessageId);

    var reply = await ProcessMessageAsync(inputMessage); // execute plus handle exceptions

    session.SendReply(inputMessage, reply)
}

1 Ответ

0 голосов
/ 11 января 2019

Сообщения отправляются только в приложения-потребители.

При этом желаемое поведение можно получить, установив для параметра "max-delivery-unacked-msgs-per-flow" в вашей очереди значение 1. Это означает, что каждому потребителю, связанному с очередью, разрешено иметь только 1 неподтвержденное сообщение. Следующее сообщение будет отправлено потребителю только после подтверждения сообщения.

Подробности об этой функции можно найти здесь .

Обратите внимание, что ваш фрагмент кода не является действительным. IFlow.ReceiveMsg используется только в транзакционных сеансах, в которых для подтверждения сообщений используется ITransactedSession.Commit.

...