ZeroMQ ест мои данные .... но только иногда - PullRequest
3 голосов
/ 19 августа 2011

Итак, я исследую 0MQ как протокол передачи сообщений для моего кластерного приложения. Он предлагает тот тип асинхронного взаимодействия, который мне нужен.

До сих пор у меня был некоторый первоначальный успех при создании прототипа. Я могу отправлять и получать сообщения, используя несколько различных типов шаблонов.

В настоящее время я пытаюсь получить шаблон PUB/SUB, работающий с начальной синхронизацией через "рукопожатие" REP/REQ. Этот метод описан в руководстве ZeroMQ в разделе «Координация узлов». И метод работает довольно хорошо. Единственная проблема в том, что 0MQ, похоже, съедает ~ 50% моих сообщений или, по крайней мере, данные в них.

Я работаю с Qt, поэтому сообщения состоят из QString. Я упаковываю строку в QByteArray, используя QDataStream. Затем я пропускаю QByteArray через «провод» и распаковываю его с другой стороны. Я часто использовал этот метод для связи по другим протоколам (например, через сокеты tcp), и он хорошо работает.

У меня есть 4 «рабочих», подключающихся через PUB/SUB и REQ/REP к одному «менеджеру». На большинстве прогонов как минимум 1 и максимум 3 "рабочих" синхронизируются нормально. Однако, когда они этого не делают, они получают пустую строку.

Вот журналы для 4 клиентов:

alicia

[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       alicia
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.341] [Debug] Recieved a SYNC message
                       SYNC
[09:18:15.341] [Info] Received START signal from manager
[09:18:15.341] [Info] Received message: 
                      2

brenda

[09:18:15.337] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       brenda
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.340] [Debug] Recieved a SYNC message
                       SYNC
[09:18:15.340] [Info] Received START signal from manager
[09:18:15.340] [Info] Received message: 
                      START
[09:18:15.340] [Debug] Sending 
                       0
                       th message

carlie

[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       carlie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.340] [Debug] Recieved a SYNC message
[09:18:15.340] [Fatal] carlie
                        Caught unhandled WASError:
                       Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message

darcie:

[09:18:15.336] [Info] Logging initialized
[09:18:15.337] [Debug] Initializing client 
                       darcie
[09:18:15.337] [Debug] Attempting to receive START signal from server
[09:18:15.340] [Info] Received message: 
                      START
[    09:18:15.341] [Debug] Recieved a SYNC message
[09:18:15.341] [Fatal] darcie
                        Caught unhandled WASError:
                       Assertion "SOMEPATH/zmqworker.cpp" failed at SOMEPATH/zmqworker.cpp:52:virtual void was::ZMQWorker::run(): Invalid sync message

Вот большая часть кода:

из zmqtools.cpp

void sendQString( zmq::socket_t& socket, QString& str )
{
    QByteArray package;
    QDataStream packer( &package, QIODevice::WriteOnly );
    packer << str;
    zmq::message_t msg( package.data(), package.size(), NULL );
    WAS_ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}


void recvQString( zmq::socket_t& socket, QString& str )
{
    zmq::message_t msg;
    WAS_ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
    QByteArray package( ( char* )msg.data(), msg.size() );
    QDataStream unpacker( &package, QIODevice::ReadOnly );
    unpacker >> str;
}

С zmqworker.cpp

    ZMQWorker::ZMQWorker( const QString& clientName, QObject *parent ) :
    QThread( parent ),
    clientName( clientName ),
    context( 1 ),
    inMessageSocket( context, ZMQ_PULL ),
    outMessageSocket( context, ZMQ_PUSH ),
        inControlSocket( context, ZMQ_SUB ),
    synchronizeSocket( context, ZMQ_REQ )
{
    qxtLog->debug() << "Initializing client " << clientName;

    inMessageSocket.connect( "tcp://localhost:9900" );
    outMessageSocket.connect( "tcp://localhost:9901" );
    inControlSocket.connect( "tcp://localhost:9902" );
    inControlSocket.setsockopt( ZMQ_SUBSCRIBE, "", 0 );
    synchronizeSocket.connect( "tcp://localhost:9903" );

    messagePoll.fd = 0;
    messagePoll.events = ZMQ_POLLIN;
    messagePoll.revents = 0;
    messagePoll.socket = inMessageSocket;

    controlPoll.fd = 0;
    controlPoll.events = ZMQ_POLLIN;
    controlPoll.revents = 0;
    controlPoll.socket = inControlSocket;
}

void ZMQWorker::run()
{
    QString message;
    // Wait for start signal from server
    bool started = true;
    do
    {
        qxtLog->debug() << "Attempting to receive START signal from server";
        recvQString( inControlSocket, message );
        qxtLog->info() << "Received message: " << message;
        started = message == "START";
    } while( !started );

    message = "SYNC";
    sendQString( synchronizeSocket, message );
    recvQString( synchronizeSocket, message );
    qxtLog->debug() << "Recieved a SYNC message" << message;
    WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );

    qxtLog->info() << "Received START signal from manager";

    int messagesSent = 0;
    forever
    {

        zmq::poll( &messagePoll, 1, 0 );
        if( messagePoll.revents & ZMQ_POLLIN )
        {
            recvQString( inMessageSocket, message );
            qxtLog->info() << "Received message: " << message;
        }

        zmq::poll( &controlPoll, 1, 0 );
        if( controlPoll.revents & ZMQ_POLLIN )
        {
            recvQString( inControlSocket, message );
            qxtLog->info() << "Received message: " << message;
            if( message == "STOP" )
                break;
        }


        if( messagesSent < 1000 )
        {
            qxtLog->debug() << "Sending " << messagesSent << "th message";
            QString message = QString::number( messagesSent );
            sendQString( outMessageSocket, message );
            messagesSent++;
        }
    }
}

С zmqmanager.cpp

ZMQManager::ZMQManager( const QString& serverName, unsigned clientCount, QObject* parent ) :
    QThread( parent ),
    serverName( serverName ),
    clientCount( clientCount ),
    context( 1 ),
    inMessageSocket( context, ZMQ_PULL ),
    outMessageSocket( context, ZMQ_PUSH ),
    outControlSocket( context, ZMQ_PUB ),
    synchronizeSocket( context, ZMQ_REP )
{
    qxtLog->debug() << "Initializing server " << serverName;


    outMessageSocket.bind( "tcp://*:9900" );
    inMessageSocket.bind( "tcp://*:9901" );
    outControlSocket.bind( "tcp://*:9902" );
    synchronizeSocket.bind( "tcp://*:9903" );


    messagePoll.fd = 0;
    messagePoll.events = ZMQ_POLLIN;
    messagePoll.revents = 0;
    messagePoll.socket = inMessageSocket;

    synchronizePoll.fd = 0;
    synchronizePoll.events = ZMQ_POLLIN;
    synchronizePoll.revents = 0;
    synchronizePoll.socket = synchronizeSocket;
}

void ZMQManager::run()
{
    QString message;
    unsigned clientsConnected = 0;
    do
    {
        qxtLog->debug() << "Publishing START signal";
        message = "START";
        sendQString( outControlSocket, message );

        zmq::poll( &synchronizePoll, 1, 5000 );
        if( synchronizePoll.revents & ZMQ_POLLIN )
        {
            qxtLog->debug() << "Checking for response to START signal";
            recvQString( synchronizeSocket, message );
            qxtLog->debug() << "Recieved a SYNC message" << message;
            WAS_ASSERT_MSG( message == "SYNC", "Invalid sync message" );
            sendQString( synchronizeSocket, message );
            clientsConnected++;
        }

    } while( clientsConnected < clientCount );

    qxtLog->info() << "Started and syncrhonized with clients";

    unsigned messagesSent = 0;
    unsigned messagesReceived = 0;
    do
    {
            zmq::poll( &messagePoll, 1, 0 );
        if( messagePoll.revents & ZMQ_POLLIN )
        {
            recvQString( inMessageSocket, message );
            qxtLog->info() << "Received message: " << message;
            messagesReceived++;
        }

        if( messagesSent < clientCount * 1000 )
        {
            qxtLog->debug() << "Sending a message";
            message = QString::number( messagesSent );
            sendQString( outMessageSocket, message );
            messagesSent++;
        }

    } while( messagesSent < clientCount * 1000 && messagesReceived < clientCount * 1000 );

    message = "STOP";
    sendQString( outControlSocket, message );

}

Итак, наконец-то мой вопрос:

  • Я что-то пропускаю в этом коде?
  • Я неправильно упаковываю / распаковываю сообщения ZeroMQ?
  • Я неправильно управляю синхронизацией REQ/REP?

Я был бы очень признателен за понимание того, кто имеет некоторый опыт работы с ZMQ, особенно со смешанными PUB/SUB, PUSH/PULL и REQ/REP шаблонами.

1 Ответ

8 голосов
/ 20 августа 2011

Проблема заключалась в том, что ZeroMQ не копирует данные при использовании конструктора

zmq::message_t( void*, int size );

Вместо этого ZeroMQ просто использует буфер, который вы предоставляете в качестве хранилища для сообщения.Таким образом, если этот буфер освобождается или перезаписывается до того, как сообщение действительно отправлено, вы получите мусор или ошибку сегмента.

Решение относительно простое.Вместо этого вы используете этот конструктор:

zmq::message_t( int size );

Он создает буфер указанного размера.Затем вы просто вручную копируете данные в сообщение перед отправкой.

Вот исправленные и работающие функции отправки:

void sendQString( zmq::socket_t& socket, QString& str )
{
    QByteArray package;
    QDataStream packer( &package, QIODevice::WriteOnly );
    packer << str;
    qxtLog->debug() << "sending a message of " << package.size() << " bytes";
    qxtLog->debug() << "the message says " << str;
    zmq::message_t msg( package.size() );
    memcpy( msg.data(), package.data(), package.size() );
    ASSERT_MSG( socket.send( msg ), "Failed to send QString" );
}

void recvQString( zmq::socket_t& socket, QString& str )
{    
    zmq::message_t msg;
    ASSERT_MSG( socket.recv( &msg ), "Failed to receive QString" );
    qxtLog->debug() << "received a message of " << (int)msg.size() << " bytes";
    QByteArray package( ( char* )msg.data(), msg.size() );
    QDataStream unpacker( &package, QIODevice::ReadOnly );
    unpacker >> str;
    qxtLog->debug() << "the message says " << str;
}

Спасибо ребятам на канале #zeromqна freenode я наконец смог определить проблему.

...