Я использую boost :: asio и boost :: thread для реализации службы сообщений, которая принимает сообщения, отправляет их асинхронно , если сообщения нет или ставит в очередь сообщение, если есть обрабатываемые сообщения.
Скорость сообщений на мой взгляд высокая, около 2.000 сообщений в секунду . С таким количеством сообщений я сталкиваюсь с искаженным сообщением, хотя очень редко. В 2000 сообщениях повреждено около 4-8 . Я полагаю, что проблема связана с неправильным использованием библиотеки boost :: asio и / или boost :: thread.
Код, который я реализовал, в основном основан на этом уроке повышения . Я не могу найти ошибку, и поскольку основные сообщения работают, мне трудно сузить проблему.
Может быть, у кого-то еще есть идея, что здесь происходит не так?
В основном этот класс используется следующим образом:
(1) Конструктор вызывается в начале моей программы для запуска потока, то есть сервиса для приема и передачи сообщений
(2) Всякий раз, когда я хочу передать сообщение, я звоню MessageService::transmitMessage()
, который делегирует задачу с async_write
потоку, обрабатывающему очередь сообщений.
using namespace google::protobuf::io;
using boost::asio::ip::tcp;
MessageService::MessageService(std::string ip, std::string port) :
work(io_service), resolver(io_service), socket(io_service) {
messageQueue = new std::deque<AgentMessage>;
tcp::resolver::query query(ip, port);
endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
this, boost::asio::placeholders::error, ++endpoint_iterator));
boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}
void MessageService::await() {
while (!messageQueue->empty()) {
signal(SIGINT, exit);
int messagesLeft = messageQueue->size();
sleep(3);
std::cout << "Pending Profiler Agents Messages: "
<< messageQueue->size() << std::endl;
if (messagesLeft == messageQueue->size()) {
std::cout << "Connection Error" << std::endl;
break;
}
}
std::cout << i << std::endl;
}
void MessageService::write(AgentMessage agentMessage, long systemTime,
int JVM_ID) {
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
agentMessage.set_jvm_id(JVM_ID);
agentMessage.set_systemtime(systemTime);
io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}
void MessageService::do_close() {
socket.close();
}
void MessageService::transmitMessage(AgentMessage agentMessage) {
++i;
boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this,
boost::asio::placeholders::error));
}
void MessageService::do_write(AgentMessage agentMessage) {
bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);
if (!write_in_progress) {
transmitMessage(agentMessage);
}
}
void MessageService::handle_write(const boost::system::error_code &error) {
if (!error) {
messageQueue->pop_front();
if (!messageQueue->empty()) {
transmitMessage(messageQueue->front());
}
} else {
std::cout << error << std::endl;
do_close();
}
}
void MessageService::handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}
MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}
Заголовочный файл:
using boost::asio::ip::tcp;
class MessageService {
public:
MessageService(std::string ip, std::string port);
virtual ~MessageService();
void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
void await();
private:
boost::asio::io_service io_service;
boost::asio::io_service::work work;
tcp::resolver resolver;
tcp::resolver::iterator endpoint_iterator;
tcp::socket socket;
std::deque<AgentMessage> *messageQueue;
void do_write(AgentMessage agentMessage);
void do_close();
void handle_write(const boost::system::error_code &error);
void handle_connect(const boost::system::error_code &error,
tcp::resolver::iterator endpoint_iterator);
void transmitMessage(AgentMessage agentMessage);
};