ZMQ Multi-part flusher: Можете ли вы получить общее количество деталей в сообщении из нескольких частей, полученном через ZeroMQ, не читая их все? - PullRequest
1 голос
/ 26 сентября 2019

Я реализую простой шаблон REQ-REP с ZeroMQ в C, используя обмен сообщениями из нескольких частей.Большинство моих сообщений имеют строго 4 части каждая (туда и обратно) с несколькими исключениями.Чтобы применить правило, мне нужно определить общее количество частей полученного сообщения, состоящего из нескольких частей.Знать, если это <= 4, легко.Вот моя функция приемника: </p>

#define BUFMAX  64 // Maximum size of text buffers
#define BUFRCV  63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '\0')

char mpartstr[4][BUFMAX];


int recv_multi(void *socket,int *aremore)

// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
 int len,rc,rcvmore,pdx,wrongpard=0;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=0;
 len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
 if(len==-1) return len;

 mpartstr[pdx][len]='\0';
 rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;

 pdx++;
 if(rcvmore==0){*aremore=0; return pdx;}

 while(rcvmore){
   len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='\0';
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx==4) break;
 }

 *aremore=rcvmore;
 return pdx;
}

Все хорошо.Но теперь в моей функции main() я проверяю, есть ли еще детали, видя значение aremore.В тех случаях, когда я не ожидаю большего, я отправлю сообщение об ошибке отправителю, но обнаружил, что ZeroMQ не нравится, если я не читаю ВСЕ части сообщения, состоящего из нескольких частей (он читает оставшиесячасти этого старого сообщения, состоящего из нескольких частей, в следующий раз, когда я вызываю функцию zmq_recv(), даже после того, как отправляю сообщение и ожидаю новый чистый ответ из нескольких частей).

Итак, что мне действительно нужно, так этосвоего рода функция очистки, чтобы очистить оставшиеся части сообщения, которое содержит более 4 частей, которые я хочу удалить.Пока что единственный способ сделать это - уродливая произвольная функция исчерпания грубой силы, например, так (aremore будет иметь значение 1 для начала - оно было установлено предыдущей функцией):

int recv_exhaust(void *socket,int *aremore)

// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{                                                          
 int len,rc,rcvmore,pdx;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=1;
 rcvmore=*aremore;

 while(rcvmore){
   len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx>1000) return -2;
 }

 return 0;
}

Если не существует выделенного API-интерфейса «сбрасывателя», то, по крайней мере, я мог бы избавиться от своего произвольного предела в 1000 сообщений, если бы у меня был какой-то способ заранее узнать, сколько частей (всего) имеет данное сообщение, состоящее из нескольких частей.Конечно, ZeroMQ знает это, потому что сообщения, состоящие из нескольких частей, отправляются как целый блок.Кто-нибудь может указать мне, как найти эту информацию?Или там есть подходящая функция / метод «промывки»?(пожалуйста, для стандарта C - не C ++ / C # и т. д.).Заранее спасибо.

1 Ответ

1 голос
/ 26 сентября 2019

Q : Может кто-нибудь указать мне, как найти эту информацию?

Да.

Q : есть ли подходящая функция / метод «промывки»?

Да и нет:

Насколько далекокак ZeroMQ v2.x вплоть до v4.3.1, не было явного API-вызова для «сбрасывателя»

Красота и возможности интеллектуальных сообщений с малой задержкой, которые обеспечивает дизайн ZeroMQ, основаны намудро созданный Zen-of-Zero: всегда предпочитая производительность комфорту - как показывают Zero-copy, Zero-Guarantee и другие парадигмы.

Наивный (и я несу большую боль, чтобы упростить это до использованияпримитивная блокировка recv () ...) "flusher" должен пройти весь путь до тех пор, пока ZMQ_RCVMORE не помечает NACK больше частей "за пределами" multi-frame-last-message (или zmq_msg_more() == 0 соответствует тому же самому).Тем не менее, все эти операции выполняют только обработку указателей, никакие данные не «перемещаются / копируются / читаются» из ОЗУ, только назначаются указатели, поэтому это действительно быстро и эффективно для ввода-вывода:

int    more;
size_t more_size = sizeof ( more );
do {
      zmq_msg_t part;                       /* Create an empty ØMQ message to hold the message part */
      int rc = zmq_msg_init (&part);           assert (rc == 0 && "MSG_INIT failed" );
      rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
                                               assert (rc != -1 && "MSG_RECV failed" );
                                            /* Determine if more message parts are to follow */
      rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
                                               assert (rc == 0 && "GETSOCKOPT failed" );
      zmq_msg_close (&part);
      } while (more);

Учитывая задокументированные свойства RFC-23 / ZMTP , существует лишь несколько (кодированная телеметрическая кодировка) гарантия:

1) все сообщения отправляются / доставляются:

  • атомарно (либо безошибочные двоичные идентичные все кадры, либо ни одного вообще)
  • не более одного раза (для соответствующего партнера)
  • в порядке

2) из нескольких частей сообщения дополнительно получаютвнутренняя (внутриполосная) -телеметрия «совет» о состоянии :

  • помеченный битами состояние { 1: more-frames-follow| 0: no-more-frames }
  • aпомеченный битами размер-типа { 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
  • a размер -vice { 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }

Документировано zmq_recv()API также довольно явно в этом:

Multi-partСообщения

Сообщение ØMQ состоит из 1 или более частей сообщения.Каждая часть сообщения является независимой zmq_msg_t сама по себе.ØMQ обеспечивает атомарную доставку сообщений: одноранговые узлы должны получать либо все части сообщения, либо ни одной вообще. Общее количество частей сообщения не ограничено, кроме как доступной памятью.

Приложение, обрабатывающее сообщения, состоящие из нескольких частей, должно использовать опцию ZMQ_RCVMORE zmq_getsockopt (3) после вызоваzmq_msg_recv (), чтобы определить, есть ли дополнительные части для получения.


Каким бы «уродливым» это не выглядело при первом чтении, наихудший случай, который поместится в памяти, - это огромное количествосообщений небольшого размера внутри многочастного фрейма сообщения.

В результате время "избавления от них" не равно нулю, но преимущества компактного и эффективного внутреннего ZMTP-Телеметрия и обработка потоков с малой задержкой - гораздо более важная цель (и она была достигнута).

Если сомневаетесь, сначала сравните наихудший случай:

a) «произведите» около 1E9 multipartфреймы с сообщениями, транспортирующие полезные нагрузки нулевого размера (без данных, но все кадры сообщений)

b) «настройка» простейшей возможной «топологии» PUSH/PULL

в) "выбрать" транспортный класс на ваш выбор { inproc:// | ipc:// | tipc:// | ... | vmci:// } - лучший без стека inproc:// (я бы начал стресс-тест с этим)

d) секундомер, такой слепой-механический-нулевой ярлык "сбрасывает" между ReferencePoint-S:, когда zmq_poll( ZMQ_POLLIN ) имеет POSACK-наличие присутствие любого читаемого содержимого и ReferencePoint-E: когда последнее из многочастного сообщения из нескольких частей было зациклено слепым «flusher» -circus.


ИНТЕРПРЕТАЦИЯ РЕЗУЛЬТАТА:

Эти наносекунды, потраченные между [S] и [E], считаются свидетельством наихудшего случая количества времени, которое получено "козел отпущения" в заведомо слепой цирковой "флешер" .В реальных сценариях использования будут дополнительные причины для того, чтобы потенциально тратить еще больше времени на то же самое.

Тем не менее, справедливо не забывать , что ответственность за отправку таких {заведомо таких размеров |плохо сформированный} - multi-frame-BEAST (s) является основной причиной любых операционных рисков при работе с этим в противном случае с ультра- низкой задержкой , high- (почти линейный) фокусируемый на масштабируемости фреймворк для обмена сообщениями / сигнализации.

Это искусство Zen-of-Zero, которое имеетпозволил этому случиться.Все благодаря Питеру ХИНТЖЕНСУ и его команде, возглавляемой Мартином СЮСТРИКОМ, мы все должны им огромное спасибо за то, что смогли и дальше работать с их наследием.

...