Неправильное использование boost :: asio и boost :: thread - PullRequest
4 голосов
/ 05 июля 2011

Я использую boost :: asio и boost :: thread для реализации службы сообщений, которая принимает сообщения, отправляет их асинхронно , если сообщения нет или ставит в очередь сообщение, если есть обрабатываемые сообщения.

Скорость сообщений на мой взгляд высокая, около 2.000 сообщений в секунду . С таким количеством сообщений я сталкиваюсь с искаженным сообщением, хотя очень редко. В 2000 сообщениях повреждено около 4-8 . Я полагаю, что проблема связана с неправильным использованием библиотеки boost :: asio и / или boost :: thread.

Код, который я реализовал, в основном основан на этом уроке повышения . Я не могу найти ошибку, и поскольку основные сообщения работают, мне трудно сузить проблему.

Может быть, у кого-то еще есть идея, что здесь происходит не так?

В основном этот класс используется следующим образом:

(1) Конструктор вызывается в начале моей программы для запуска потока, то есть сервиса для приема и передачи сообщений

(2) Всякий раз, когда я хочу передать сообщение, я звоню MessageService::transmitMessage(), который делегирует задачу с async_write потоку, обрабатывающему очередь сообщений.

using namespace google::protobuf::io;
using boost::asio::ip::tcp;

MessageService::MessageService(std::string ip, std::string port) :
    work(io_service), resolver(io_service), socket(io_service) {

    messageQueue = new std::deque<AgentMessage>;
    tcp::resolver::query query(ip, port);
    endpoint_iterator = resolver.resolve(query);

    tcp::endpoint endpoint = *endpoint_iterator;

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
            this, boost::asio::placeholders::error, ++endpoint_iterator));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}

void MessageService::await() {

    while (!messageQueue->empty()) {

        signal(SIGINT, exit);

        int messagesLeft = messageQueue->size();
        sleep(3);
        std::cout << "Pending Profiler Agents Messages: "
                << messageQueue->size() << std::endl;
        if (messagesLeft == messageQueue->size()) {
            std::cout << "Connection Error" << std::endl;
            break;
        }
    }

    std::cout << i << std::endl;
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
        int JVM_ID) {
    agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
    agentMessage.set_jvm_id(JVM_ID);
    agentMessage.set_systemtime(systemTime);
    io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}

void MessageService::do_close() {
    socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

    bool write_in_progress = !messageQueue->empty();
    messageQueue->push_back(agentMessage);

    if (!write_in_progress) {
        transmitMessage(agentMessage);
    }
}

void MessageService::handle_write(const boost::system::error_code &error) {

    if (!error) {
        messageQueue->pop_front();
        if (!messageQueue->empty()) {
            transmitMessage(messageQueue->front());
        }
    } else {
        std::cout << error << std::endl;
        do_close();
    }
}

void MessageService::handle_connect(const boost::system::error_code &error,
        tcp::resolver::iterator endpoint_iterator) {
    // can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
    // TODO Auto-generated destructor stub
}

Заголовочный файл:

    using boost::asio::ip::tcp;

class MessageService {
public:
    MessageService(std::string ip, std::string port);
    virtual ~MessageService();
    void write(AgentMessage agentMessage, long systemTime, int JVM_ID);
    void await();

private:
    boost::asio::io_service io_service;
    boost::asio::io_service::work work;
    tcp::resolver resolver;
    tcp::resolver::iterator endpoint_iterator;
    tcp::socket socket;
    std::deque<AgentMessage> *messageQueue;

    void do_write(AgentMessage agentMessage);

    void do_close();

    void handle_write(const boost::system::error_code &error);

    void handle_connect(const boost::system::error_code &error,
            tcp::resolver::iterator endpoint_iterator);

    void transmitMessage(AgentMessage agentMessage);
};

1 Ответ

2 голосов
/ 06 июля 2011

этот метод мне кажется сомнительным

void MessageService::transmitMessage(AgentMessage agentMessage) {
    ++i;

    boost::asio::streambuf b;
    std::ostream os(&b);

    ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
    CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

    coded_output->WriteVarint32(agentMessage.ByteSize());
    agentMessage.SerializeToCodedStream(coded_output);

    delete coded_output;
    delete raw_output;

    boost::system::error_code ignored_error;

    boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error));
}

Вы, похоже, сериализуете AgentMessage (который должен быть передан через константную ссылку между прочим) в streambuf. Тем не менее, эти сериализованные данные не гарантируются, пока не будет вызван обработчик завершения async_write, который подробно описан в документации async_write

буферы

Один или несколько буферов, содержащих данные для записи. Хотя объект буфера может быть при необходимости копируется, право собственности на базовые блоки памяти сохраняются вызывающей стороной, которая должна гарантировать что они остаются в силе до обработчик называется.

чтобы решить эту проблему, убедитесь, что буфер остается в области действия до тех пор, пока не будет вызван обработчик завершения. Один из способов сделать это - передать буфер в качестве аргумента ограниченному обработчику завершения:

boost::asio::async_write(socket, b.data(), boost::bind(
            &MessageService::handle_write, this,
            boost::asio::placeholders::error,
            coded_output
            // ^^^ buffer goes here
            ));

затем удалите его из обработчика завершения. Я бы посоветовал вам также взглянуть на использование shared_ptr вместо обнаженных указателей.

...