IBM-MQ Reader в .net XMS для подтверждения обработанных сообщений один за другим - PullRequest
1 голос
/ 30 ноября 2011

Я реализую компонент, который считывает все сообщения из определенной очереди, поскольку они доступны, но должен асинхронно удалять сообщения из очереди только после того, как содержимое сообщения было обработано и сохранено. Мы читаем сообщения быстрее, чем мы их подтверждаем (например, мы могли прочитать, прочитав 10 сообщений, прежде чем мы готовы подтвердить первое). Текущая реализация использует XMS API, но мы можем переключиться на MQI, если XMS не подходит для этих целей.

Мы попробовали два подхода для решения этой проблемы, но оба имеют недостатки, которые делают их неприемлемыми. Я надеялся, что кто-то может предложить лучший способ.

Первая реализация использует IMessageConsumer в выделенном потоке, чтобы читать все сообщения и публиковать их содержимое по мере их доступности. Когда сообщение обработано, вызывается метод message.Acknowledge(). Сессия создается с AcknowledgeMode.ClientAcknowledge. Проблема с этим подходом состоит в том, что согласно документации, это подтверждает (и удаляет) ВСЕ неподтвержденные сообщения, которые были получены. В приведенном выше примере это будет означать, что все 10 прочитанных сообщений будут подтверждены первым вызовом. Так что это на самом деле не решает проблему. Из-за необходимой нам скорости чтения мы не можем реально изменить это решение, чтобы дождаться подтверждения первого сообщения перед прочтением второго и т. Д.

Вторая реализация использует IQueueBrowser в выбранном потоке для чтения всех сообщений и публикации их содержимого. Это не удаляет сообщения из очереди во время чтения. Затем отдельный выделенный поток ожидает (в BlockingQueue) идентификаторы сообщений JMS сообщений, которые были обработаны. Для каждого из них он затем создает выделенный IMessageConsumer (используя селектор сообщений с JMSMessageID), чтобы прочитать сообщение и подтвердить его. (Такое соединение IQueueBrowser с выделенным IMessageConsumer рекомендуется в разделе документации XMS по Браузеры очередей .) Этот метод работает должным образом, но, как можно было бы предположить, он слишком интенсивно использует процессор на сервере MQ.

1 Ответ

3 голосов
/ 02 декабря 2011

Оба метода, предложенные в вопросе, по-видимому, полагаются на один экземпляр приложения.Что не так с использованием нескольких экземпляров приложения, транзакционных сеансов и COMMIT?Все отчеты о производительности (это SupportPacs с именами, такими как MP **) показывают, что пропускная способность максимизируется для нескольких экземпляров приложения, а горизонтальное масштабирование является одним из наиболее часто используемых подходов в вашем сценарии.

Дизайн для этого будет либо несколько экземпляров приложения или несколько потоков в одном приложении.Ключ к правильной работе должен помнить, что транзакции ограничены дескриптором соединения.Это означает, что многопоточное приложение должно отправлять отдельный поток для каждого экземпляра соединения, а сообщения читаются в одном потоке.

Поток процесса заключается в том, что при использовании транзакционного сеанса приложение выполняет обычныйMQGet для очереди, обрабатывает содержимое сообщения, как требуется, а затем выдает MQCommit.(Я буду использовать собственные имена API MQ в моих примерах, потому что это не зависит от языка.) Если это транзакция XA, приложение будет вызывать MQBegin, чтобы начать транзакцию, но для однофазной фиксации транзакция предполагается.В обоих случаях MQCommit завершает транзакцию, которая удаляет сообщения из очереди.Пока сообщения синхронизированы, никакой другой экземпляр приложения не может их извлечь;MQ просто доставляет следующее доступное сообщение.Если транзакция откатывается, следующая MQGet из любого потока извлекает ее, предполагая доставку FIFO.

Есть несколько примеров в:
[WMQ install home]\tools\dotnet\samples\cs\xms\simple\wmq\
... и SimpleXAConsumer.csодин пример, который показывает версию XA этого.Версия без XA проще, так как вам не нужен внешний координатор, глаголы MQBegin и так далее.Если вы начнете с одного из них, убедитесь, что в нем не указано исключительное использование очереди, и вы можете запустить несколько экземпляров, используя одну и ту же конфигурацию.В качестве альтернативы возьмите часть примера, которая включает создание соединения, обработку сообщений, закрытие и уничтожение соединения, и оберните все это в класс средства порождения потока.

[ Вставьте обычный совет по использованию последней версии.версия занятия здесь. ]

...