Поместить асинхронное сообщение в очередь WebSphere MQ - PullRequest
2 голосов
/ 22 декабря 2011

Я пытаюсь поместить постоянные сообщения в очередь WebSphere MQ, однако они должны быть асинхронными. Мне кажется, что единственный способ заставить асинхронную работу работать - это непостоянные сообщения (под этим я подразумеваю, что putSuccessCount равно количеству сообщений, помещенных в MQAsyncStatus, в остальных случаях оно равно нулю). Код ниже обрисовывает в общих чертах то, что я пытаюсь сделать:

import com.ibm.mq.MQAsyncStatus;
import com.ibm.mq.MQDestination;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;

public class MQPutTest extends TestCase{

    private static Logger log = Logger.getLogger(MQPutTest.class);

    public void testPut() throws Exception{

        Hashtable<String, Object> props = new Hashtable<String, Object>();
        props.put(MQConstants.CHANNEL_PROPERTY, "my_channel");
        props.put(MQConstants.PORT_PROPERTY, 1414);
        props.put(MQConstants.HOST_NAME_PROPERTY, "localhost");

        String qManager = "my_queue_manager"; 
        MQQueueManager qMgr = new MQQueueManager(qManager, props);

        int openOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INPUT_AS_Q_DEF;

        MQDestination queue = qMgr.accessQueue("my_queue", openOptions);

        MQPutMessageOptions pmo = new MQPutMessageOptions();
        pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE;

        MQMessage message = new MQMessage();
        message.format = MQConstants.MQFMT_STRING;
        message.writeString("test message");
        queue.put(message, pmo);

        queue.close();
        MQAsyncStatus asyncStatus = qMgr.getAsyncStatus();
        qMgr.disconnect();
    }
}

Я связываю увеличение производительности с большим количеством сообщений с тем фактом, что очередь настроена как непостоянная, а не как сообщения, которые ставятся асинхронными. В расширенных свойствах очереди в MQ explorer я установил тип ответа "положить" по умолчанию "Асинхронный", но это не имеет никакого эффекта.

Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 23 декабря 2011

Исходя из комментариев в вопросе, WMQ не имеет понятия «постоянная очередь» (отсюда и причина, по которой я сменил название поста). Атрибут очереди для постоянства никоим образом не меняет то, как QMgr обрабатывает очередь или любые сообщения в ней. Все, что он делает, это сообщает программе, какую опцию персистентности использовать, если программист не может явно установить это значение. То, является ли какое-либо данное сообщение постоянным, зависит от того, указывало MQMD постоянство, когда сообщение было впервые помещено в очередь. Любая очередь может содержать постоянные и непостоянные сообщения в любой комбинации.

В частности, во фрагменте кода, который он публикует, не указывается постоянство с использованием message.setPersistence(), поэтому он наследует независимо от значения по умолчанию для очереди. Это, в свою очередь, зависит от значения, унаследованного от атрибута очереди. Ни в коем случае настройка атрибута очереди не переопределяет явную настройку из приложения .

Таким образом, разница в производительности, которую вы видите, действительно, скорее всего, отражает настройку атрибута DEFPSIST очереди, но это не означает, что асинхронные операции не работают. Помните, что асинхронные размещения никоим образом не уменьшают время, необходимое для помещения сообщения в очередь. QMgr имеет один и тот же путь кода для сохранения сообщения независимо от того, является ли установка синхронной. Разница в том, что ваше приложение больше не ждет, пока QMgr завершит свою работу. Вполне возможно, что когда вы вызываете обновление MQAsyncStatus, WMQ еще не написал каких-либо сообщений . Это даже более вероятно, если все они находятся в одной единице работы, потому что WMQ не будет возвращать статус для всех сообщений, пока не завершится обработка COMMIT. Если вы не позвоните COMMIT явно, это произойдет, когда вы закроете очередь.

Вы можете убедиться в этом, повторяя вызов qMgr.getAsyncStatus() каждую секунду в течение нескольких секунд после COMMIT или CLOSE. Вы должны увидеть, что первое не возвращает успешно отправленных сообщений, но в конечном итоге вы можете учесть их все.

Кстати, вопрос о том, являются ли сообщения постоянными , почти всегда должен обрабатываться в вашем коде . Постоянство сообщений обычно выводится как бизнес-требование, которое учитывается при разработке приложения. Поэтому ответственность за соблюдение требования лежит на менеджере проекта и разработчике. Единственный способ надежно убедиться в его соблюдении - приложение явно вызывает setPErsistence(). В противном случае приложение будет использовать значение из атрибута очереди DEFPSIST, чтобы неявно вызвать setPersistence(). Так зачем беспокоиться о разрешении такого типа по умолчанию в API? Потому что есть несколько законных случаев, когда постоянство должно быть в состоянии изменить во время выполнения . Написание приложения для преднамеренного принятия значения по умолчанию и повторного открытия очереди после выполнения каждой единицы работы. Во всех остальных случаях постоянство должно быть установлено в программе или в управляемых объектах.

Наконец, если вы помещаете 10 000 сообщений в одну единицу работы, еще одна причина, по которой вы можете получить ответ о том, что ни одно сообщение не было успешно помещено, - это недостаток места в журнале или не настраиваемые параметры для размещения нагрузки. Например, если для MAXUMSGS установлено значение меньше 10 000, вся ваша транзакция будет откатываться. С другой стороны, если MAXUMSGS установлено правильно, но размер и количество первичных и вторичных журналов недостаточны для хранения объема данных в транзакции, тогда снова будет выполнена откат всей транзакции. Вам следует немного поиграться с интервалом COMMIT, поскольку оптимальное значение зависит от размера сообщений по отношению к размеру очереди и буферов журнала. Когда вы превышаете объем данных, которые можно оптимизировать в одну операцию записи, дополнительные сообщения фактически снижают производительность. Когда люди помещают 10 000 сообщений в одну единицу работы, это вряд ли когда-либо влияет на производительность, а скорее потому, что это их подходящая единица восстановления, а соответствующее снижение производительности является вторичным по отношению к требованию восстановления.

0 голосов
/ 22 декабря 2011

Если можно потерять одно или два сообщения, вы можете добавить сообщения во внутреннюю очередь отправки, например,

ExecutorService service = Executors.newSingleThreadedExecutor();

// build you message here

// add the message to be sent asynchronously.
service.execute(new Runnable() {
    public void run() {
         queue.put(message, pmo);
    }
});

Любые сообщения в очереди, когда процесс умирает, теряются.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...