.NET асинхронный MSMQ - PullRequest
       29

.NET асинхронный MSMQ

5 голосов
/ 30 марта 2011

Я не понимаю, где это идет не так. По сути, у меня есть программа, которая получает из очереди сообщений и обрабатывает сообщения. Программу можно остановить в любое время, и в этом случае цикл обработки сообщений завершает свою работу до выхода из программы. Я пытаюсь сделать это с помощью следующего кода:

private MessageQueue q;
private ManualResetEventSlim idle;

public void Start()
{
    idle = new ManualResetEventSlim();
    q.ReceiveCompleted += this.MessageQueue_ReceiveCompleted;    
    q.BeginReceive();
}    

public void Stop()
{ 
    this.q.Dispose();
    this.idle.Wait();    
}

private void MessageQueue_ReceiveCompleted(object sender, 
    ReceiveCompletedEventArgs e)
{
    Message inMsg;
    try
    {
        inMsg = e.Message;
    }
    catch (Exception ex)
    {
        this.idle.Set();
        return;
    }

    // Handle message

    this.q.BeginReceive();
}

Как мы надеемся, метод Stop удаляет очередь сообщений, а затем ожидает установки дескриптора ожидания ожидания (что должно произойти, когда при получении будет вызвано событие ReceiveCompleted, но свойство e.Message должно, кроме ).

Однако цикл сообщений просто продолжается! Я удалил очередь сообщений, но ей все равно удается прочитать из нее, и обработчик исключений не вызывается, то есть строка idle.Wait ждет вечно.

Насколько я понимаю, удаление очереди сообщений ДОЛЖНО завершать любое ожидающее получение и вызывать событие, но e.Message (или q.EndReceive) должно вызвать исключение. Разве это не так? Если нет, как еще можно безопасно выйти из цикла сообщений?

Спасибо

UPDATE:

Вот полный пример (предполагается, что очередь существует)

class Program
{
    static MessageQueue mq;
    static ManualResetEventSlim idleWH;

    static void Main(string[] args)
    {
        idleWH = new ManualResetEventSlim();

        Console.WriteLine("Opening...");
        using (mq = new MessageQueue(@".\private$\test"))
        {
            mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(int) });
            mq.ReceiveCompleted += mq_ReceiveCompleted;

            for (int i = 0; i < 10000; ++i)
                mq.Send(i);

            Console.WriteLine("Begin Receive...");
            mq.BeginReceive();

            Console.WriteLine("Press ENTER to exit loop");
            Console.ReadLine();

            Console.WriteLine("Closing...");

            mq.Close();
        }

        Console.WriteLine("Waiting...");
        idleWH.Wait();

        Console.WriteLine("Press ENTER (ex)");
        //Console.ReadLine();
    }

    static void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        try
        {
            var msg = mq.EndReceive(e.AsyncResult);
            Console.Title = msg.Body.ToString();

            // Receive next message
            mq.BeginReceive();
        }
        catch (Exception ex)
        {
            idleWH.Set();
            return;
        }
    }
}

Ответы [ 3 ]

4 голосов
/ 30 марта 2011

Не совсем уверен, как вы сделали эту работу вообще.Вы должны вызвать MessageQueue.EndReceive () в событии.Только этот метод может вызвать исключение.Просмотрите пример кода MSDN для события ReceiveCompleted.И не поймайте Исключение, которое просто вызывает неопровержимый сбой.Поймать конкретное исключение, которое вы получаете, когда вы удаляете очередь, ObjectDisposedException.

1 голос
/ 11 марта 2013

Единственный способ получить эту работу - использовать транзакционную очередь.Любая нетранзакционная очередь кажется уязвимой для этого.Не ответ, но лучший совет, который я могу дать любому, кто найдет это.

0 голосов
/ 03 мая 2012
    private static volatile bool _shouldStop = false;

. , ,

            _shouldStop = true;
            mq.Close();

. , ,

        try
        {
            var msg = mq.EndReceive(e.AsyncResult);

            if ( _shouldStop)
            {
                idleWH.Set();
                return;
            }

            mq.BeginReceive();
        }

. , .

...