Подписчики ZeroMQ с разной скоростью видят одинаковые сообщения - PullRequest
0 голосов
/ 26 апреля 2018

Я использую zmq 2.2 (я знаю более старую версию) в c ++, чтобы создать издателя с несколькими подключенными подписчиками, которые читают сообщения с разной скоростью. Исходя из моего понимания документов, а также ответа Питера Хинтьенса здесь , у каждого подписчика есть своя собственная очередь, а у издателя очередь на каждого подключенного подписчика. Это может указывать на то, что каждый подписчик получает сообщения от издателя независимо от других подписчиков.

Тем не менее, во фрагменте кода ниже быстрые и медленные подписчики получают одинаковые сообщения или точно такие же сообщения (это происходит даже тогда, когда я увеличиваю время ожидания в точке A и изменяю ZMQ_HWM в точке B ).

Может кто-нибудь пролить свет на то, почему это происходит?

#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;

vector<int> slow_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    sleep(3);  // 3 seconds
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        usleep(10000);  // 10 miliseconds ___________________________POINT A
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
    vector<int> v;
    context_t context{1};
    socket_t socket(context, ZMQ_SUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    socket.connect("tcp://localhost:5554");
    msg_t msg;
    for (int i = 0; i < to_read; i++)
    {
        socket.recv(&msg);
        v.emplace_back(*reinterpret_cast<int*>(msg.data()));
    }
    return v;
}
void publisher(int64_t hwm)
{
    context_t context{1};
    socket_t socket(context, ZMQ_PUB);
    socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
    socket.bind("tcp://*:5554");
    int count = 0;
    while (true) {
        msg_t msg(sizeof(count));
        memcpy(msg.data(), &count, sizeof(count));
        socket.send(msg);
        count++;
    }
}

int main() 
{
    int64_t hwm = 1;  // __________________________________________POINT B
    int to_read = 20;
    auto fast = async(launch::async, fast_consumer, hwm, to_read);
    auto slow = async(launch::async, slow_consumer, hwm, to_read);
    hwm = 1;  // Don't queue anything on the publisher
    thread pub(publisher, hwm);
    auto slow_v = slow.get();
    auto fast_v = fast.get();

    cout << "fast    slow" << endl;
    for (int i = 0; i < fast_v.size(); i ++)
    {
        cout << fast_v[i] << "   " << slow_v[i] << endl;
    }
    exit(0);
}

Скомпилировано с: g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread по GCC 6.3

Пример вывода:

fast    slow
 25988   305855
 52522   454312
 79197   477807
106365   502594
132793   528551
159236   554519
184486   581419
209208   606411
234483   629298
256122   651159
281188   675031
305855   701533  // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312   727817
477807   754154
502594   778654
528551   804137
554519   830677
581419   854959
606411   878841
629298   902601

1 Ответ

0 голосов
/ 26 апреля 2018

каждый подписчик имеет свою очередь

Да, это так ...

это происходит из спроектированных свойств экземпляра PUB * .Context(), где происходит управление очередью отправки (подробнее об этом немного позже).

Можно прочитать краткое описание основных концептуальных приемов в [ Иерархия ZeroMQ менее чем за пять секунд ].

Это может означать, что каждый подписчик получает сообщения от издателя независимо от других подписчиков.

Да, это так ...

нет взаимодействия между соответствующими " приватными " - очередями. Здесь важна ZMQ_HWM в ее побочной роли семантики "Blocker".

В этой настройке минималистичный ZMQ_HWM защищает / блокирует любую новую запись от вставки в PUB сторону "private" -sending-Queue (размер no глубже, чем в соответствии с ZMQ_HWM == 1), до тех пор, пока он не будет успешно удален («удаленным» SUB -side Context() -s автономно асинхронным "внутренним") связанная с транспортом инициатива, при возможной (повторной) загрузке этой SUB стороны "частной" приемной очереди (размер, опять же, не глубже, чем в соответствии с ZMQ_HWM == 1 )

Другими словами, полезные нагрузки PUB.send() -s будут эффективно отбрасываться до тех пор, пока удаленные *_SUB.recv() -s не будут выгружать "блокирующую" -payload из их «удаленная» - очередь приема Context() -объекта (рассчитанная таким образом, чтобы не иметь возможности хранить какую-либо одну полезную нагрузку больше, чем единица - в соответствии с ZMQ_HWM == 1).

Таким образом, PUB.send() -er сработало более ~ 902601 сообщений , во время ( тайного блокирования ) тест на получение примерно 20 из них на стороне SUB (== to_read).

Все эти 902581+ сообщения просто выбрасывались прямо на стороне PUB при Context() при вызове .send() -метод.


Как это на самом деле работает внутри? упрощенный вид внутри Context()

Учитывая приведенный выше пример макета, управляемый пул очередей Context() растет / сжимается в соответствии с .connect() -едровыми одноранговыми узлами, которые появляются и исчезают, однако в API ZeroMQ v2.2 имея на стороне TX и RX одинаковый потолок High Water Mark. Как задокументировано, попытки .send() чего-либо выше этого предела отбрасываются.

TIME                   _____________________________
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     [                             ]
v                     PUB.setsockopt(  ZMQ_HWM, 1 );]
v                     PUB.send()-s     [        |   ]
v                        :             [        +-----------------QUEUE-length ( a storage depth ) is but one single message
v    _________________   :             [             
v   [                 ]  :             [Context()-managed pool-of-QUEUE(s)
v   [                 ]  :             [
v   [                 ]  :             [          ___________________
v   [                 ]  :             [         [                   ]
v   FAST_SUB.connect()---:------------>[?]       [                   ]
v   FAST_SUB.recv()-s    :             [?]       [                   ]
v           :            :             [?]       [                   ]
v           :            :             [?][?]<---SLOW_SUB.connect()  ]
v           :            :             [?][?]    SLOW_SUB.recv()-s   ]
v           :            .send(1)----->[1][1]            :
|       1 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(2)----->[2][1]            :
|       2 <-.recv()--------------------[?][1]            :
|           :                          [?][1]            :
|           :            .send(3)----->[3][1]            :
|       3 <-.recv()--------------------[?][?]------------.recv()-> 1
|           :                          [?][?]            :
|           :            .send(4)----->[4][4]            :
|       4 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(5)----->[5][4]            :
|       5 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(6)----->[6][4]            :
|       6 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(7)----->[7][4]            :
|       7 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(8)----->[8][4]            :
|       8 <-.recv()--------------------[?][4]            :
|           :                          [?][4]            :
|           :            .send(9)----->[9][4]            :
|       9 <-.recv()--------------------[?][?]------------.recv()-> 4
|           :                          [?][?]            :
|           :            .send(A)----->[A][A]            :
|       A <-.recv()--------------------[?][A]
|           :                          [?][A]
|           :            .send(B)----->[B][A]
|       B <-.recv()--------------------[?][A]
v           :                          [  [
v           :                          [
v           :
v

"Сообщения на быстрый абонент , начиная с на линии с сообщениями на медленный абонент "

Нет, этого не происходит . Здесь нет «очереди», но есть просто совпадение длительностей, когда fast- SUB еще не достиг 20x .recv() -s, до медленного (-ed) - SUB, наконец, достигнуто после блокировка sleep( 3 ).

Начальный «разрыв» - это просто влияние фазы sleep( 3 ), где более медленный - SUB не пытается ничего получить

main(){
|  
| async(launch::async,fast|_fast____________|
| async(launch::async,slow|     .setsockopt |_slow____________|
| ...                     |     .setsockopt |     .setsockopt |
| ...                     |     .connect    |     .setsockopt |
| thread                  |      ~~~~~~?    |     .connect    |
| |_pub___________________|      ~~~~~~?    |      ~~~~~~?    |
| |    .setsockopt        |      ~~~~~~?    |      ~~~~~~?    |
| |    .bind              |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~?           |      ~~~~~~?    |      ~~~~~~?    |
| |     ~~~~~~=RTO        |      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  1,2,..99|      ~~~~~~?    |      ~~~~~~?    |
| |    .send()-s  23456,..|      ~~~~~~=RTO |      ~~~~~~=RTO |
| |    .send()-s  25988,..|  25988 --> v[ 0]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  52522,..|  52522 --> v[ 1]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s  79197,..|  79197 --> v[ 2]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 106365,..| 106365 --> v[ 3]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 132793,..| 132793 --> v[ 4]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 159236,..| 159236 --> v[ 5]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 184486,..| 184486 --> v[ 6]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 209208,..| 209208 --> v[ 7]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 234483,..| 234483 --> v[ 8]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 256122,..| 256122 --> v[ 9]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 281188,..| 281188 --> v[10]|     :  slow still sleep( 3 )-s before going to .recv() it's first message
| |    .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| |    .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| |    .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| |    .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| |    .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| |    .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| |    .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| |    .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| |    .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| |    .send()-s 651159,..|                 | 651159 --> v[ 9]|
| |    .send()-s 675031,..|     return v    | 675031 --> v[10]|
| |    .send()-s 701533,..|_________________| 701533 --> v[11]|
| |    .send()-s 727817,..|                 | 727817 --> v[12]|
| |    .send()-s 754154,..|                 | 754154 --> v[13]|
| |    .send()-s 778654,..|                 | 778654 --> v[14]|
| |    .send()-s 804137,..|                 | 804137 --> v[15]|
| |    .send()-s 830677,..|                 | 830677 --> v[16]|
| |    .send()-s 854959,..|                 | 854959 --> v[17]|
| |    .send()-s 878841,..|                 | 878841 --> v[18]|
| |    .send()-s 902601,..|                 | 902601 --> v[19]|
| |    .send()-s 912345,..|                 |                 |
| |    .send()-s 923456,..|                 |     return v    |
| |    .send()-s 934567,..|                 |_________________|
| |    .send()-s 945678,..|
| |    .send()-s 956789,..|
| |    .send()-s 967890,..|
| |    .send()-s 978901,..|
| |    .send()-s 989012,..|
| |    .send()-s 990123,..|
| |    .send()-s ad inf,..|                    

Хотя код PUB обязательно вызывает .send() -s как можно быстрее, он локальный Context() -экземпляр не зарезервировал больше места, чем для принятия только одного такого сообщения все остальные безмолвно отбрасывались всякий раз, когда соло-позиция в очереди была занята.

Всякий раз, когда маркер HWM == 1 возвращался к нулю, внутренняя механика позволяла следующему другому .send() передавать фактическое содержимое сообщения (полезную нагрузку) в хранилище очереди и все грядущие попытки последовать за .send() снова начали молча отбрасываться из-за логики, связанной с HWM.

...