ActiveMQ теряет много сообщений при переходе на другой ресурс (только по темам). Производитель пишет 1000 сообщений в теме, в то время как потребитель читает из той же темы. В середине процесса я закрываю мастер ActiveMQ, и процесс продолжается с ведомым устройством ActiveMQ. Когда переход сделан, много сообщений потеряно (~ 100 сообщений). Продукт, над которым я работаю, включает в себя не потерять сообщения. Что я мог сделать для настойчивости на темах? Производитель:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>
std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");
std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";
void CreateMessage()
{
for (int i = 0; i < _multiplyFactor; i++) {
_bodyMessage += _garbageMessage;
}
}
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
CreateMessage();
activemq::core::ActiveMQConnectionFactory factory;
factory.setBrokerURI(_amqURI);
std::auto_ptr<cms::TextMessage> message;
std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));
connection->start();
std::auto_ptr<cms::Session> session(connection->createSession());
std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
for (int i = 0; i < _countMessages; i++) {
std::stringstream ss;
ss << i;
std::string number = ss.str();
message.reset(session->createTextMessage(number));
producer->send(message.get());
std::cout << i << std::endl;
}
//message.reset(session->createTextMessage("DONE"));
//producer->send(message.get());
//connection->close();
//activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
Потребитель:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>
std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
class MsgListener : public cms::MessageListener
{
public:
std::string _amqURI;
cms::Connection *_connection;
cms::Session* _session;
cms::Destination* _destination;
cms::MessageConsumer* _consumer;
bool _sessionTransacted;
bool _useTopic;
MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
{
this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
this->_connection->start();
/*if (this->_sessionTransacted == true) {
this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
}
else {
this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
}*/
this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);
if (useTopic) {
this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
}
else {
this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
}
this->_consumer = this->_session->createConsumer(this->_destination);
this->_consumer->setMessageListener(this);
/*std::cout.flush();
std::cerr.flush();*/
}
~MsgListener()
{
}
void onMessage(const cms::Message* CMSMessage)
{
static int count = 0;
try
{
const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
std::string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
}
else {
text = "NOT A TEXTMESSAGE!";
}
std::cout << "(" << count << ", " << text << ")" << std::endl;
count++;
}
catch (cms::CMSException& e)
{
e.printStackTrace();
}
if (this->_sessionTransacted) {
this->_session->commit();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
MsgListener consumer(amqURI, true, true);
while (true);
//activemq::library::ActiveMQCPP::shutdownLibrary();
}
Consumer_durable:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>
std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
class MsgListener : public cms::MessageListener
{
public:
std::string _amqURI;
cms::Connection *_connection;
cms::Session* _session;
cms::Destination* _destination;
cms::MessageConsumer* _consumer;
bool _sessionTransacted;
bool _useTopic;
MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
{
this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
this->_connection->start();
/*if (this->_sessionTransacted == true) {
this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
}
else {
this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
}*/
this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);
if (useTopic) {
this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
}
else {
this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
}
//this->_consumer = this->_session->createConsumer(this->_destination);
static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
this->_consumer->setMessageListener(this);
/*std::cout.flush();
std::cerr.flush();*/
}
~MsgListener()
{
}
void onMessage(const cms::Message* CMSMessage)
{
static int count = 0;
try
{
const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
std::string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
}
else {
text = "NOT A TEXTMESSAGE!";
}
std::cout << "(" << count << ", " << text << ")" << std::endl;
count++;
}
catch (cms::CMSException& e)
{
e.printStackTrace();
}
if (this->_sessionTransacted) {
this->_session->commit();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
MsgListener consumer(amqURI, true, true);
while (true);
//activemq::library::ActiveMQCPP::shutdownLibrary();
}