ZeroMQ - CZMQ - шаблон Majordomo - потеря запросов при отправке БЫСТРО - PullRequest
1 голос
/ 10 апреля 2020

Я уже некоторое время бьюсь об этом и надеюсь, что один из вас сможет указать мне правильное направление.

Проблема в том, что всякий раз, когда запросы передаются брокеру FAST, не все из них делают это (одному) работнику.

Если я ввожу некоторую задержку между запросами (см. sleep (1) в коде клиента), все работает просто хорошо, но очевидно, что это не приемлемо

Для воспроизведения проблемы, с которой я столкнулся, я создал эту упрощенную версию своего кода:

Клиент:

#include <stdio.h>
#include <stdlib.h>

#include "czmq.h"
#include "majordomo_library.h"


#define SAFEFREE(x)                                                            \
  if (x) {                                                                     \
    free(x);                                                                   \
    x = NULL;                                                                  \
  }

int main() {

  char service[] = "bb-test";
  char endpoint[] = "ipc:///tmp/bbtest.ipc";

  mdp_client_t **clients = NULL;
  zmsg_t *request = NULL;

  char request_str[128];
  char *cmd = NULL, *reply = NULL;
  int i = 0, loops = 10;

  /* Create array of ptr for <loop> clients */
  clients = calloc(loops, sizeof(mdp_client_t *));
  assert(clients != NULL);

  /* create <loops> client sessions and send a request on each */
  for (i = 0; i < loops; i++) {
    /* create a new MDP client session */
    clients[i] = mdp_client_new(endpoint);
    if (!clients[i]) {
      fprintf(stderr, "Error %s\r\n", mdp_client_reason(clients[i]));
      exit(-1);
    }
    /* create new request message */
    request = zmsg_new();
    assert(request != NULL);
    memset(request_str, 0, 128);
    sprintf(request_str, "Request %d", i);
    zmsg_addstr(request, request_str);
    /* send the message as an MDP client request */
    if(mdp_client_request(clients[i], service, &request) ==0 ) {
      fprintf(stdout, "%s sent\r\n", request_str);
    } else {
      fprintf(stderr, "%s NOT SENT (%s)\r\n", request_str, mdp_client_reason(clients[i]));
    }

    zmsg_destroy(&request);

    /* If I add sleep time here, so the worker can process the
     * request and send the reply back, it works just fine.
     * As soon as a drop all requests to the broker, the worker gets
     * stuck at zsock_recv() stuck after processing only one, or a
     * subset of the requests )
     * */
    //sleep(1);
  }

  /*  collect the replies       */
  for (i = 0; i < loops; i++) {

    /* create a message pipe to read the replies */
    zsock_t *client_sock = mdp_client_msgpipe(clients[i]);
    assert(client_sock);
    /* set receive timeout (60s) */
    zsock_set_rcvtimeo(client_sock, 10000);
    /* get the message as "ss" (string and string) into cmd and reply*/
    if (zsock_recv(client_sock, "ss", &cmd, &reply) == 0) {
      fprintf(stdout, "Received: %s: %s\r\n", cmd, reply);
    } else {
      fprintf(stderr, "Failed to receive reply %s\r\n",
              mdp_client_reason(clients[i]));
    }

    /* close the message pipe */
    zmq_close(client_sock);

    /* destroy the client session */
    if (clients[i]) {
      mdp_client_destroy(&clients[i]);
    }

    SAFEFREE(cmd);
    SAFEFREE(reply);
  }

  return 0;
}

здесь как запустить mdp_broker по умолчанию:

#include <stdio.h>
#include <stdlib.h>

#include "czmq.h"
#include "mdp_broker.h"

int main() {

  int rc = 0;

  zactor_t *broker = zactor_new(mdp_broker, "test_MDP-broker");
  assert(broker != NULL);
  zstr_send(broker, "VERBOSE");
  zstr_sendx(broker, "BIND", "ipc:///tmp/bbtest.ipc", NULL);


  getchar();

  zactor_destroy(&broker);

  exit(0);
}

и, наконец, вот рабочий:

#include <stdio.h>
#include <stdlib.h>

#include "czmq.h"
#include "mdp_worker.h"

#define SAFEFREE(x)                                                            \
  if (x) {                                                                     \
    free(x);                                                                   \
    (x) = NULL;                                                                \
  }

int main() {

  char service[] = "bb-test";
  char endpoint[] = "ipc:///tmp/bbtest.ipc";

  mdp_worker_t *worker_session = NULL;
  zsock_t *worker_sock = NULL;
  zframe_t *address = NULL;

  char *cmd = NULL;
  char *request = NULL;
  char *reply = NULL;
  int rc = 0;

  /* create new worker and register the service with the broker */
  worker_session = mdp_worker_new(endpoint, service);
  assert(worker_session != NULL);
  mdp_worker_set_verbose(worker_session);

  worker_sock = mdp_worker_msgpipe(worker_session);
  assert(worker_sock != NULL);

  while (1) {

    rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);
    if (rc != 0) {
      fprintf(stderr, "Failed to receive message: %s\r\n",
              mdp_worker_reason(worker_session));
      continue;
    }

    fprintf(stdout, "Got message \"%s\"\r\n", request);

    reply = calloc(strlen(request) + 10, sizeof(char));
    assert(reply != NULL);
    snprintf(reply, strlen(request) + 10, "%s - reply", request);

    /*  Create reply message */
    zmsg_t *msg_response = zmsg_new();
    assert(msg_response != NULL);

    /* Send */
    rc = zmsg_addstr(msg_response, reply);
    assert(rc == 0);

    rc = mdp_worker_send_final(worker_session, &address, &msg_response);
    fprintf(rc == 0 ? stdout : stderr, "Sending reply (\"%s\") was %s\r\n\r\n",
            reply, rc == 0 ? "successful" : "UNSUCCESSFUL");

    zmsg_destroy(&msg_response);
    SAFEFREE(cmd)
    SAFEFREE(request)
    SAFEFREE(reply)
  }
  mdp_worker_destroy(&worker_session);
  exit(0);
}

Результаты со сном (1)

Клиент:

D: 20-04-10 20:59:35 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 20:59:36 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 20:59:37 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 20:59:38 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 20:59:39 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 20:59:40 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 20:59:41 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 20:59:42 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 20:59:43 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 20:59:44 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply
Received: FINAL: Request 4 - reply
Received: FINAL: Request 5 - reply
Received: FINAL: Request 6 - reply
Received: FINAL: Request 7 - reply
Received: FINAL: Request 8 - reply
Received: FINAL: Request 9 - reply

Process finished with exit code 0

Работник:

D: 20-04-10 20:59:32 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful

Got message "Request 1"
Sending reply ("Request 1 - reply") was successful

Got message "Request 2"
Sending reply ("Request 2 - reply") was successful

Got message "Request 3"
Sending reply ("Request 3 - reply") was successful

Got message "Request 4"
Sending reply ("Request 4 - reply") was successful

Got message "Request 5"
Sending reply ("Request 5 - reply") was successful

Got message "Request 6"
Sending reply ("Request 6 - reply") was successful

Got message "Request 7"
Sending reply ("Request 7 - reply") was successful

Got message "Request 8"
Sending reply ("Request 8 - reply") was successful

Got message "Request 9"
Sending reply ("Request 9 - reply") was successful

и без задержки

Клиент:

D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply

Рабочий:

D: 20-04-10 21:03:40 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful

Got message "Request 1"
Sending reply ("Request 1 - reply") was successful

Got message "Request 2"
Sending reply ("Request 2 - reply") was successful

Got message "Request 3"
Sending reply ("Request 3 - reply") was successful

рабочий блокирует на

rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);

Подробный вывод брокера сообщает мне, что все запросы поступают к брокеру, но (в этом случае) существует только 3 сообщения WORKER_FINAL. Количество успешно обработанных запросов варьируется, на самом деле, это не всегда только 3, но с ростом числа запросов оно прерывается НА НЕКОТОРЫЙ ТОЧКЕ.

Есть идеи? кто-нибудь?? довольно-пожалуйста ???

1 Ответ

0 голосов
/ 13 апреля 2020

Я определил проблему. Это связано с mdp_broker. Начиная с 603a304fb674733bd00c0314761242da013a327f с субботы, 29 февраля 10:20:52 2020, брокер не отправляет запросы в очереди, если не существует события «worker_ready» или «client_request». Таким образом, если в очередь добавляются запросы, а работника нет, общее количество полученных и отправленных запросов будет отличаться, а некоторые запросы останутся в очереди без обработки до истечения времени ожидания.

MDP-Broker Необходимо также проверять / отправлять любые запросы, как только / пока есть запросы в очереди и рабочий ожидает - независимо от входящего события handle_request.

Поэтому я добавил вызов s_dispatch () в конец функции handle_final () в mdp_broker. c. Это заставляет посредника проверять наличие ожидающих запросов и отправлять их отправку каждый раз, когда работник повторно добавляется в список ожидающих работников после обработки предыдущих запросов.

handle_final () в mdp_broker. c поэтому должен выглядеть так:

static void
handle_worker_final (client_t *self)
{
    mdp_msg_t *msg = self->message;
    mdp_msg_t *client_msg = mdp_msg_new();
    // Set routing id, messageid, service, body
    zframe_t *address = mdp_msg_address(msg);

    mdp_msg_set_routing_id(client_msg, address);
    mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
    const char *service_name = self->service_name;
    mdp_msg_set_service(client_msg, service_name);
    zmsg_t *body = mdp_msg_get_body(msg);
    mdp_msg_set_body(client_msg, &body);
    mdp_msg_send(client_msg, self->server->router);

    // Add the worker back to the list of waiting workers.
    char *identity = zframe_strhex(mdp_msg_routing_id(msg));

    worker_t *worker =
        (worker_t *) zhash_lookup(self->server->workers, identity);
    assert(worker);
    zlist_append(self->server->waiting, worker);
    service_t *service = (service_t *) zhash_lookup(self->server->services,
        worker->service->name);
    assert(service);
    zlist_append(service->waiting, worker);

    zstr_free(&identity);
    mdp_msg_destroy(&client_msg);
    s_service_dispatch(service);
}

исправление передано команде zmq / majordomo. Я обновлю этот пост снова, после того, как если будет совершено.

Joerg

...