ZeroMQ: получение клиентского сообщения с помощью ZMQ_STREAM - PullRequest
0 голосов
/ 14 апреля 2020

Я бы хотел получить доступ к сообщению, полученному от TCP-партнера, используя сокет ZeroMQ ZMQ_STREAM. В следующем примере в C строка msg кажется пустой:

/* http.c */
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main(int argc, char* argv[])
{
    void *ctx    = zmq_ctx_new();
    void *socket = zmq_socket(ctx, ZMQ_STREAM);
    int rc       = zmq_bind(socket, "tcp://127.0.0.1:8080");

    uint8_t id[256];
    size_t  id_size = 256;

    char    msg[256];
    size_t  msg_size = 256;

    char http_response[] =
        "HTTP/1.0 200 OK\r\n"
        "Content-Type: text/html\r\n"
        "\r\n"
        "<h1>Hello, World!</h1>";

    while (1)
    {
        id_size = zmq_recv(socket, id, 256, 0);

        msg_size = zmq_recv(socket, msg, sizeof(msg), 0);
        msg[msg_size] = '\0';
        printf("REQUEST: %s\n", msg);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, http_response, strlen(http_response), ZMQ_SNDMORE);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, 0, 0, ZMQ_SNDMORE);
    }

    zmq_close(socket);
    zmq_ctx_destroy(ctx);

    return 0;
}

Компиляция и выполнение:

$ cc -Wall -I/usr/local/include/ -L/usr/local/lib/ -o http http.c -lzmq
$ ./http
REQUEST:
REQUEST:

Я не могу использовать API CZMQ в качестве Я должен положиться на FFI до libzmq.

Ответы [ 2 ]

0 голосов
/ 15 апреля 2020

Проблема заключается просто в неправильной обработке формата сообщений ZeroMQ. Каждый раз, когда клиент подключается, сначала отправляется пустое сообщение [id, ] (без полезной нагрузки). Следующее сообщение [id, payload] содержит строку запроса HTTP GET в своей полезной нагрузке.

    // Get [id, ] message on client connection.
    id_size = zmq_recv(socket, id, 256, 0);
    printf("ID SIZE.: %zu\n", id_size);

    if (id_size == 0)
        continue;

    // Empty payload.
    buffer_size = zmq_recv(socket, buffer, 256, 0);

    // Get [id, playload] message.
    id_size = zmq_recv(socket, id, 256, 0);

    if (id_size == 0)
        continue;

    // Get payload.
    // [...]
0 голосов
/ 15 апреля 2020

У кода макета есть предостережение - он не подчиняется опубликованному API, так как никогда не отправляет ни одного сообщения ни одному пиру (из-за пропущенного когда-либо не SNDMORE помеченного кадра) + лучше .bind() в существующий, достижимый TCP-адрес (не эмулируемый интерфейс абстрактной обратной связи в 127.0.0.1), иначе не было бы никакого внешнего пира, который когда-либо мог бы инициализировать связь и доставить любой запрос только следующему процессу raw[] или чему-либо еще в бесконечном while -l oop:

...
#define FOREVER 1

int main( int argc, char* argv[] )
{
    void *ctx    = zmq_ctx_new ();                  assert ( ctx         && "FAILED to instantiate a ZeroMQ Context" );
    void *socket = zmq_socket ( ctx, ZMQ_STREAM );  assert ( socket      && "FAILED to instantiate a STREAM Socket" ) ;
    int   rc = zmq_bind (socket, "tcp://*:8080");   assert ( rc == 0     && "FAILED to .bind on <tcp>://address:port>" );

    size_t  id_size = 256;
    uint8_t id [id_size];

    size_t  raw_size = 256;
    uint8_t raw [raw_size];

    while ( FOREVER ) {                          /* Get HTTP request; ID frame and then PAYLOAD frame ........................*/
        id_size = zmq_recv ( socket, id, 256, 0 );  assert ( id_size > 0 && "FAILED to .recv() an ID frame" );
        do {  
                  raw_size = zmq_recv (socket, raw, 256, 0 ); assert ( raw_size >= 0 && "FAILED to .recv() a PAYLOAD frame" );
        } while ( raw_size == 256 );             /* consumes the whole PAYLOAD frame till the last Byte ......................*/

        char http_response [] =  "HTTP/1.0 200 OK\r\n"
                                 "Content-Type: text/plain\r\n"
                                 "\r\n"
                                 "Hello, World!";
      //----------------------------------------------------------------------------------------------------------------------  
        zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards
                           id_size,
                           ZMQ_SNDMORE ); //.............. flag == _SNDMORE
        zmq_send ( socket, http_response, //..............         PAYLOAD content loaded
                           strlen ( http_response),
                           0 ); //........................ flag == 0 i.e. LAST FRAME ... == CAN .send() THE MESSAGE AS A WHOLE
      //----------------------------------------------------------------------------------------------------------------------
        zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards
                           id_size,
                           ZMQ_SNDMORE ); //.............. flag == _SNDMORE
        zmq_send ( socket, 0, //.......................... 0
                           0, //.......................... 0    == a ZeroSized _STREAM PAYLOAD
                           0 ); //........................ flag == 0 i.e. LAST FRAME ... ZeroSized _STREAM PAYLOAD == .close()
      //----------------------------------------------------------------------------------------------------------------------
    }
    zmq_close ( socket );
    zmq_ctx_destroy ( ctx );

    return( 0 );
}
...