Как обработать управление потоком производителя в обмене сообщениями jms при использовании apache qpid - PullRequest
2 голосов
/ 13 января 2012

Я пытаюсь справиться с ситуацией управления потоком на стороне производителя.У меня есть очередь на qpid-брокере с установленным максимальным размером очереди.Также установите в очереди flow_stop_count и flow_resume_count.

теперь у производителя продолжает непрерывно генерировать сообщения до тех пор, пока не будет достигнуто значение flow_stop_count.При нарушении этого числа выдается исключение, которое обрабатывается слушателем исключений.Теперь через некоторое время потребитель в очереди наверстает упущенное и будет достигнут поток flow_resume_count.Вопрос в том, как производитель узнает об этом событии.

Вот пример кода производителя

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

и для прослушивателя исключений

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

ТеперьИсключительный прослушиватель является универсальным прослушивателем исключений, поэтому не следует останавливать поток продюсера через него.

Возможно, мне нужен какой-то метод на уровне продюсера, что-то вроде produer.isFlowStopped() , который я могу использовать для проверки перед отправкой сообщения.Существует ли такая функциональность в qpid api.

На веб-сайте qpid имеется некоторая документация, в которой предлагается сделать это.Но я нигде не мог найти примеров того, как это делается.

Существует ли какой-то стандартный способ обработки такого сценария.

Ответы [ 3 ]

2 голосов
/ 22 января 2012

Из того, что я прочитал из документации Apache QPid, кажется, что flow_resume_count и flow_stop_count заставят производителей начать блокироваться.

Следовательно, единственной возможностью было бы программное обеспечение для регулярного опроса, пока сообщения не начнут возобновляться.

Выписка из здесь .

Если производитель отправляет в очередь, которая переполнена, брокер ответит, указав клиенту не отправлять больше сообщений. Это приводит к тому, что любые будущие попытки отправки будут блокироваться до тех пор, пока брокер не отменит распоряжение управления потоком.

При блокировке клиент будет периодически регистрировать факт его блокировки в ожидании управления потоком.

WARN AMQSession - принудительное управление потоком, установленное брокером WARN AMQSession - отправка сообщения задерживается на 5 с из-за принудительного управления потоком посредником WARN AMQSession - отправка сообщения задерживается на 10 секунд из-за принудительного управления потоком посредником По истечении заданного периода времени пересылка будет отключена и вызовет JMSException для вызывающего кода.

ОШИБКА AMQSession - Ошибка отправки сообщения из-за ожидания тайм-аута при принудительном управлении потоком посредником.

Из этой документации следует, что программное обеспечение, управляющее производителем, должно будет самостоятельно управлять. Таким образом, в основном, когда вы получаете исключение, что очередь переполнена, вам нужно будет отменить ее и, скорее всего, опросить и повторить попытку отправки ваших сообщений.

1 голос
/ 20 июня 2012

Управление потоком производителя еще не реализовано на клиенте JMS. Смотри https://issues.apache.org/jira/browse/QPID-3388

1 голос
/ 22 января 2012

Вы можете попытаться установить емкость (размер в байтах, при котором очередь считается заполненной) и flowResumeCapacity (размер очереди, при котором производители не перемещаются) для очереди.

send () будетзатем блокируется, если размер превышает значение емкости.

Вы можете взглянуть на этот файл теста в репозитории, чтобы получить представление.

...