Как работает MessageQueue.BeginReceive и как правильно его использовать? - PullRequest
4 голосов
/ 15 сентября 2011

В настоящее время у меня есть фоновая тема. В этой теме есть бесконечный цикл.

Этот цикл время от времени обновляет некоторые значения в базе данных, а затем прослушивает 1 секунду в MessageQueue (с queue.Receive(TimeSpan.FromSeconds(1))).

Пока сообщение не приходит, этот вызов внутренне генерирует MessageQueueException (Timeout), который перехватывается, а затем цикл продолжается. Если есть сообщение, вызов обычно возвращается и сообщение обрабатывается, после чего цикл продолжается.

Это приводит к лоту исключений первого шанса (каждую секунду, за исключением сообщения, которое нужно обработать), и это приводит к рассылке отладочного вывода, а также к сбоям в отладчике, когда я забыл исключить MessageQueueExceptions.

Итак, как асинхронная обработка MessageQueue должна выполняться правильно, при этом гарантируя, что, пока мое приложение работает, очередь отслеживается и база данных тоже обновляется время от времени. Конечно, поток здесь не должен использовать 100% ЦП.

Мне просто нужна большая картинка или намек на правильно выполненную асинхронную обработку.

Ответы [ 3 ]

4 голосов
/ 15 сентября 2011

Вместо того, чтобы зацикливаться в потоке, я бы рекомендовал зарегистрировать делегата для события ReceiveCompleted вашего MessageQueue, как описано здесь:

</p> <p>using System; using System.Messaging;</p> <p>namespace MyProject { /// /// Provides a container class for the example. /// public class MyNewQueue {</p> <pre><code> //************************************************** // Provides an entry point into the application. // // This example performs asynchronous receive operation // processing. //************************************************** public static void Main() { // Create an instance of MessageQueue. Set its formatter. MessageQueue myQueue = new MessageQueue(".\\myQueue"); myQueue.Formatter = new XmlMessageFormatter(new Type[] {typeof(String)}); // Add an event handler for the ReceiveCompleted event. myQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted); // Begin the asynchronous receive operation. myQueue.BeginReceive(); // Do other work on the current thread. return; } //************************************************** // Provides an event handler for the ReceiveCompleted // event. //************************************************** private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult) { // Connect to the queue. MessageQueue mq = (MessageQueue)source; // End the asynchronous Receive operation. Message m = mq.EndReceive(asyncResult.AsyncResult); // Display message information on the screen. Console.WriteLine("Message: " + (string)m.Body); // Restart the asynchronous Receive operation. mq.BeginReceive(); return; } }

}

Источник: https://docs.microsoft.com/en-us/dotnet/api/system.messaging.messagequeue.receivecompleted?view=netframework-4.7.2

3 голосов
/ 15 сентября 2011

Рассматривали ли вы MessageEnumerator , который возвращается из MessageQueue.GetMessageEnumerator2 ?

  • Вы получаете динамическое содержимое очереди для проверки и удаления сообщений из очереди во время итерации.
  • Если сообщений нет, MoveNext() вернет false и вам не нужно перехватывать исключения первого шанса
  • Если после начала итерации появляются новые сообщения, они будут повторяться.over (если они ставятся после курсора).
  • Если перед курсором появляются новые сообщения, вы можете просто сбросить итератор или продолжить (если в данный момент вам не нужны сообщения с более низким приоритетом).
3 голосов
/ 15 сентября 2011

Вопреки комментарию Джейми Диксона, сценарий является исключительным.Обратите внимание на именование метода и его параметров: BeginReceive (TimeSpan timeout)

Если бы метод был назван BeginTryReceive, это было бы совершенно нормально, если бы не было получено никакого сообщения.Присвоение имени BeginReceive (или Receive для версии синхронизации) подразумевает, что сообщение должно поступить в очередь.То, что параметр TimeSpan называется timeout, также имеет значение, потому что время ожидания является исключительным.Тайм-аут означает, что ответ был ожидаемым, но не был дан, и вызывающий абонент решает прекратить ожидание и предполагает, что произошла ошибка.Когда вы вызываете BeginReceive / Receive с 1-секундным таймаутом, вы заявляете, что, если к тому времени в очередь не поступило ни одного сообщения, что-то должно быть не так, и нам нужно это обработать.

Способ, которым я бы реализовалэто, если я понимаю, что вы хотите сделать правильно, это:

  • Call BeginReceive либо с очень большим тайм-аутом, либо даже без тайм-аута, если я не вижу пустую очередь как ошибку.
  • Присоединить обработчик событий к событию ReceiveCompleted, которое 1) обрабатывает сообщение и 2) снова вызывает BeginReceive.
  • Я бы НЕ использовал бесконечный цикл.Это и плохая практика, и полностью избыточная при использовании асинхронных методов, таких как BeginReceive.
  • edit: Чтобы отказаться от очереди, которая не читается ни одним клиентом, попросите авторов очереди заглянуть вочередь, чтобы определить, «мертв» ли он.

edit: У меня есть другое предложение.Поскольку я не знаю деталей вашего заявления, я понятия не имею, возможно ли оно или целесообразно.Мне кажется, что вы в основном устанавливаете соединение между клиентом и сервером, используя очередь сообщений в качестве канала связи.Почему это «связь»?Потому что очередь не будет записана, если никто не слушает.Я думаю, это в значительной степени то, что есть связь.Не будет ли более уместным использовать сокеты или именованные каналы для передачи сообщений?Таким образом, клиенты просто закрывают объекты Stream, когда они закончили чтение, и серверы немедленно уведомляются.Как я уже сказал, я не знаю, может ли это работать на то, что вы делаете, но похоже на более подходящий канал связи.

...