каждый подписчик имеет свою очередь
Да, это так ...
это происходит из спроектированных свойств экземпляра PUB
* .Context()
, где происходит управление очередью отправки (подробнее об этом немного позже).
Можно прочитать краткое описание основных концептуальных приемов в [ Иерархия ZeroMQ менее чем за пять секунд ].
Это может означать, что каждый подписчик получает сообщения от издателя независимо от других подписчиков.
Да, это так ...
нет взаимодействия между соответствующими " приватными " - очередями. Здесь важна ZMQ_HWM
в ее побочной роли семантики "Blocker".
В этой настройке минималистичный ZMQ_HWM
защищает / блокирует любую новую запись от вставки в PUB
сторону "private" -sending-Queue (размер no глубже, чем в соответствии с ZMQ_HWM == 1
), до тех пор, пока он не будет успешно удален («удаленным» SUB
-side Context()
-s автономно асинхронным "внутренним") связанная с транспортом инициатива, при возможной (повторной) загрузке этой SUB
стороны "частной" приемной очереди (размер, опять же, не глубже, чем в соответствии с ZMQ_HWM == 1
)
Другими словами, полезные нагрузки PUB.send()
-s будут эффективно отбрасываться до тех пор, пока удаленные *_SUB.recv()
-s не будут выгружать "блокирующую" -payload из их «удаленная» - очередь приема Context()
-объекта (рассчитанная таким образом, чтобы не иметь возможности хранить какую-либо одну полезную нагрузку больше, чем единица - в соответствии с ZMQ_HWM == 1
).
Таким образом, PUB.send()
-er сработало более ~ 902601
сообщений , во время ( тайного блокирования ) тест на получение примерно 20
из них на стороне SUB
(== to_read
).
Все эти 902581+
сообщения просто выбрасывались прямо на стороне PUB
при Context()
при вызове .send()
-метод.
Как это на самом деле работает внутри? упрощенный вид внутри Context()
Учитывая приведенный выше пример макета, управляемый пул очередей Context()
растет / сжимается в соответствии с .connect()
-едровыми одноранговыми узлами, которые появляются и исчезают, однако в API ZeroMQ v2.2 имея на стороне TX и RX одинаковый потолок High Water Mark. Как задокументировано, попытки .send()
чего-либо выше этого предела отбрасываются.
TIME _____________________________
v [ ]
v [ ]
v [ ]
v [ ]
v PUB.setsockopt( ZMQ_HWM, 1 );]
v PUB.send()-s [ | ]
v : [ +-----------------QUEUE-length ( a storage depth ) is but one single message
v _________________ : [
v [ ] : [Context()-managed pool-of-QUEUE(s)
v [ ] : [
v [ ] : [ ___________________
v [ ] : [ [ ]
v FAST_SUB.connect()---:------------>[?] [ ]
v FAST_SUB.recv()-s : [?] [ ]
v : : [?] [ ]
v : : [?][?]<---SLOW_SUB.connect() ]
v : : [?][?] SLOW_SUB.recv()-s ]
v : .send(1)----->[1][1] :
| 1 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(2)----->[2][1] :
| 2 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(3)----->[3][1] :
| 3 <-.recv()--------------------[?][?]------------.recv()-> 1
| : [?][?] :
| : .send(4)----->[4][4] :
| 4 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(5)----->[5][4] :
| 5 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(6)----->[6][4] :
| 6 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(7)----->[7][4] :
| 7 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(8)----->[8][4] :
| 8 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(9)----->[9][4] :
| 9 <-.recv()--------------------[?][?]------------.recv()-> 4
| : [?][?] :
| : .send(A)----->[A][A] :
| A <-.recv()--------------------[?][A]
| : [?][A]
| : .send(B)----->[B][A]
| B <-.recv()--------------------[?][A]
v : [ [
v : [
v :
v
"Сообщения на быстрый абонент , начиная с на линии с сообщениями на медленный абонент "
Нет, этого не происходит . Здесь нет «очереди», но есть просто совпадение длительностей, когда fast- SUB
еще не достиг 20x .recv()
-s, до медленного (-ed) - SUB
, наконец, достигнуто после блокировка sleep( 3 )
.
Начальный «разрыв» - это просто влияние фазы sleep( 3 )
, где более медленный - SUB
не пытается ничего получить
main(){
|
| async(launch::async,fast|_fast____________|
| async(launch::async,slow| .setsockopt |_slow____________|
| ... | .setsockopt | .setsockopt |
| ... | .connect | .setsockopt |
| thread | ~~~~~~? | .connect |
| |_pub___________________| ~~~~~~? | ~~~~~~? |
| | .setsockopt | ~~~~~~? | ~~~~~~? |
| | .bind | ~~~~~~? | ~~~~~~? |
| | ~~~~~~? | ~~~~~~? | ~~~~~~? |
| | ~~~~~~=RTO | ~~~~~~? | ~~~~~~? |
| | .send()-s 1,2,..99| ~~~~~~? | ~~~~~~? |
| | .send()-s 23456,..| ~~~~~~=RTO | ~~~~~~=RTO |
| | .send()-s 25988,..| 25988 --> v[ 0]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 52522,..| 52522 --> v[ 1]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 79197,..| 79197 --> v[ 2]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 106365,..| 106365 --> v[ 3]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 132793,..| 132793 --> v[ 4]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 159236,..| 159236 --> v[ 5]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 184486,..| 184486 --> v[ 6]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 209208,..| 209208 --> v[ 7]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 234483,..| 234483 --> v[ 8]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 256122,..| 256122 --> v[ 9]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 281188,..| 281188 --> v[10]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| | .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| | .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| | .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| | .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| | .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| | .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| | .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| | .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| | .send()-s 651159,..| | 651159 --> v[ 9]|
| | .send()-s 675031,..| return v | 675031 --> v[10]|
| | .send()-s 701533,..|_________________| 701533 --> v[11]|
| | .send()-s 727817,..| | 727817 --> v[12]|
| | .send()-s 754154,..| | 754154 --> v[13]|
| | .send()-s 778654,..| | 778654 --> v[14]|
| | .send()-s 804137,..| | 804137 --> v[15]|
| | .send()-s 830677,..| | 830677 --> v[16]|
| | .send()-s 854959,..| | 854959 --> v[17]|
| | .send()-s 878841,..| | 878841 --> v[18]|
| | .send()-s 902601,..| | 902601 --> v[19]|
| | .send()-s 912345,..| | |
| | .send()-s 923456,..| | return v |
| | .send()-s 934567,..| |_________________|
| | .send()-s 945678,..|
| | .send()-s 956789,..|
| | .send()-s 967890,..|
| | .send()-s 978901,..|
| | .send()-s 989012,..|
| | .send()-s 990123,..|
| | .send()-s ad inf,..|
Хотя код PUB
обязательно вызывает .send()
-s как можно быстрее, он локальный Context()
-экземпляр не зарезервировал больше места, чем для принятия только одного такого сообщения все остальные безмолвно отбрасывались всякий раз, когда соло-позиция в очереди была занята.
Всякий раз, когда маркер HWM == 1
возвращался к нулю, внутренняя механика позволяла следующему другому .send()
передавать фактическое содержимое сообщения (полезную нагрузку) в хранилище очереди и все грядущие попытки последовать за .send()
снова начали молча отбрасываться из-за логики, связанной с HWM
.