Службы Windows, обменивающиеся данными через MSMQ. Нужна ли служебная шина? - PullRequest
2 голосов
/ 08 октября 2011

У меня есть эта проблема, когда система содержит узлы (службы Windows), которые отправляют сообщения для обработки, и другие, которые извлекают сообщения и обрабатывают их.

Это было разработано таким образом, что push-узлы балансируют нагрузку между очередями, поддерживая циклический список очередей и чередующиеся очереди после каждой отправки. Поэтому сообщение 1 будет отправлено в очередь 1, сообщение 2 - в очередь 2 и т. Д. До сих пор эта часть работала отлично.

На уровне извлечения сообщений мы разработали его так, чтобы сообщения извлекались аналогичным образом - сначала из очереди 1, затем из очереди 2 и т. Д. Теоретически, каждый узел извлечения находится на отдельной машине и на практике пока Он прослушивал только одну очередь. Но недавно появившееся требование заставило нас создать в сети машину, которая прослушивает более одной очереди: один узел, который обычно очень занят и заполнен миллионами сообщений, а другой, как правило, содержит всего несколько сообщений.

Проблема, с которой мы сталкиваемся, заключается в том, что способ, которым мы изначально проектировали узлы извлечения, переходит из очереди в очередь, пока не будет найдено сообщение. Если время ожидания истекло (скажем, через секунду), оно переходит к следующей очереди.

Это больше не работает, потому что Q1 (заполненный миллионами сообщений) будет задерживаться примерно на секунду для каждого сообщения, так как после каждого извлечения из Q1 мы будем запрашивать сообщение у Q2 (и если оно не будет содержать, мы будем ждать секунду ).

Итак, это выглядит так:

Q1 содержит 10 сообщений, а Q2 не содержит

  • Узел Pull запрашивает сообщение от Q1
  • Q1 немедленно возвращает сообщение
  • Узел Pull запрашивает сообщение от Q1
  • ------------ Ожидание секунды ------------- (Q2 пусто и время ожидания запроса)
  • Узел Pull запрашивает сообщение от Q1
  • Q1 немедленно возвращает сообщение
  • Узел Pull запрашивает сообщение от Q1
  • ------------ Ожидание секунды ------------- (Q2 пуст и время запроса истекло)

и т.д.

Так что это явно неправильно.

Я думаю, я ищу лучшее архитектурное решение здесь. Обработка сообщений не обязательно должна осуществляться в режиме реального времени, насколько это возможно, но должна быть надежной, и ни одно сообщение не должно быть потеряно!

Мне бы хотелось услышать ваше мнение по этой проблеме.

Спасибо заранее Яннис

Ответы [ 2 ]

1 голос
/ 26 октября 2011

Я закончил тем, что создал набор потоков - по одному для каждого msmq, который должен быть обработан.В конструкторе я инициализирую эти потоки:

Storages.ForEach(queue =>
        {
            Task task = Task.Factory.StartNew(() =>
            {
                LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
                while (true)
                {
                    WorkItem mime = queue.WaitAndRetrieve();
                    if (mime != null)
                    {
                        _Semaphore.WaitOne();
                        _LocalStorage.Enqueue(mime);

                        lock (_locker) Monitor.Pulse(_locker);

                        LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
                    }
                }
            });
        });
  • _LocalStorage - это потокобезопасная реализация очереди (ConcurrentQueue, представленная в .NET 4.0)

  • Семафор - это счетный семафор для управления вставками в _LocalStorage._LocalStorage - это в основном буфер принимаемых сообщений, но мы не хотим, чтобы он становился слишком большим, пока узлы обработки заняты работой.В результате мы можем получить ВСЕ сообщения msmq в этом _LocalStorage, но заняты обработкой только 5 из них или около того.Это плохо как с точки зрения устойчивости (если программа неожиданно завершает работу, мы теряем все эти сообщения), так и с точки зрения производительности, поскольку потребление памяти для хранения всех этих элементов в памяти будет огромным.Поэтому нам нужно контролировать, сколько элементов мы храним в очереди буфера _LocalStorage.

  • Мы пульсируем потоки, ожидающие работы (см. Ниже), чтобы новый элемент был добавлен в очередь с помощью простого монитора.Pulse

Код, который удаляет рабочие элементы из очереди, выглядит следующим образом:

lock (_locker)
            if (_LocalStorage.Count == 0) 
                Monitor.Wait(_locker);

        WorkItem result;
        if (_LocalStorage.TryDequeue(out result))
        {
            _Semaphore.Release();
            return result;
        }

        return null;

Надеюсь, это поможет кому-то разобраться в аналогичной проблеме.

1 голос
/ 08 октября 2011

Может быть, вы могли бы использовать событие ReceiveCompleted в классе MessageQueue? Тогда не нужно опрашивать.

...