Отказоустойчивый ведущий ведомый ActiveMQ теряет сообщения - PullRequest
0 голосов
/ 06 ноября 2019

ActiveMQ теряет много сообщений при переходе на другой ресурс (только по темам). Производитель пишет 1000 сообщений в теме, в то время как потребитель читает из той же темы. В середине процесса я закрываю мастер ActiveMQ, и процесс продолжается с ведомым устройством ActiveMQ. Когда переход сделан, много сообщений потеряно (~ 100 сообщений). Продукт, над которым я работаю, включает в себя не потерять сообщения. Что я мог сделать для настойчивости на темах? Производитель:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>

std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");

std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";

void CreateMessage()
{
    for (int i = 0; i < _multiplyFactor; i++) {
        _bodyMessage += _garbageMessage;
    }
}

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    CreateMessage();
    activemq::core::ActiveMQConnectionFactory factory;
    factory.setBrokerURI(_amqURI);
    std::auto_ptr<cms::TextMessage> message;
    std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));

    connection->start();

    std::auto_ptr<cms::Session> session(connection->createSession());
    std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));

    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);

    for (int i = 0; i < _countMessages; i++) {
        std::stringstream ss;
        ss << i;
        std::string number = ss.str();
        message.reset(session->createTextMessage(number));
        producer->send(message.get());
        std::cout << i << std::endl;
    }

    //message.reset(session->createTextMessage("DONE"));
    //producer->send(message.get());

    //connection->close();

    //activemq::library::ActiveMQCPP::shutdownLibrary();

    return 0;
}

Потребитель:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        this->_consumer = this->_session->createConsumer(this->_destination);
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";

            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

Consumer_durable:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        //this->_consumer = this->_session->createConsumer(this->_destination);



        static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
        this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";
            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

1 Ответ

1 голос
/ 06 ноября 2019

Если вы хотите сохранить сообщение, то вы должны либо использовать Очереди, либо использовать подписки Durable Topic. Темы сами по себе не сохраняют сообщения независимо от постоянного режима производителя, фактически, если подписчики не подписаны и вы отправляете в тему сообщение, которое отбрасывается, аналогичным образом конфигурация ActiveMQ для управления постоянным лимитом ожидающих сообщений для тем будетотбрасывать старые сообщения в теме, где потребители не могут следить, поскольку тема имеет низкий уровень гарантии обслуживания.

Вам нужно использовать Очередь и установить постоянную для производителя, или убедиться, что у вас есть существующая подписка на длительную тему, и отправлять сообщения, используя производителя, который назначает постоянный флаг, если вы хотите, чтобы сообщение было записано вхранилище и восстанавливается при сбое брокера.

...