Я использую функцию множественного подключения ZeroMQ для подключения одного DEALER
к 2 ROUTERS
:
+----> .Connect() --> ROUTER 1
/
DEALER ---+------> .Connect() --> ROUTER 2
В моем тестеЯ отправляю 10 сообщений через DEALER
.Я получаю хорошее равномерное распределение 5 сообщений для каждого из ROUTER
-s.
Моя проблема в том, что если ROUTER 1
по какой-то причине пропадет, DEALER
все равно продолжит помещать сообщения в очередь.для этого я думаю в предположении, что ROUTER 1
в конечном итоге вернется.В итоге я получаю только 5 сообщений на ROUTER 2
.
. Мне нужно, чтобы DEALER
игнорировал отключенных или неисправных пиров.Возможно ли это?
Я пытался установить ZMQ_SNDHWM
и многие другие, но, похоже, ничего не работает.
Единственная альтернатива, которую я вижу, это выполнитьСам отказоустойчивый, с отдельными сокетами, пульсами, ACK-пакетами и т. д. Похоже, что такой базовый шаблон уже должен быть реализован ZeroMQ.
Редактировать : тестовый код
package main
import (
"github.com/pebbe/zmq4"
"time"
"log"
"fmt"
)
func receiveAll(sok *zmq4.Socket) (received int) {
poller := zmq4.NewPoller()
poller.Add(sok, zmq4.POLLIN)
for {
sockets, err := poller.Poll(100 * time.Millisecond)
if err != nil {
log.Print(err)
}
if len(sockets) > 0 {
for _, s := range sockets {
msg, _ := s.Socket.RecvMessageBytes(0)
if string(msg[1]) != "Hello World" {
log.Fatalf("Unexpected message: %s\n", msg)
}
received ++
}
} else {
return
}
}
}
func main() {
dealer, _ := zmq4.NewSocket(zmq4.DEALER)
router1, _ := zmq4.NewSocket(zmq4.ROUTER)
router2, _ := zmq4.NewSocket(zmq4.ROUTER)
router1.Bind("tcp://0.0.0.0:6667")
router2.Bind("tcp://0.0.0.0:6668")
dealer.Connect("tcp://0.0.0.0:6667")
dealer.Connect("tcp://0.0.0.0:6668")
router1.SetSubscribe("")
router2.SetSubscribe("")
dealer.SetSubscribe("")
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count1 := receiveAll(router1)
count2 := receiveAll(router2)
fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)
// Shut down a peer
router1.Close()
time.Sleep(300 * time.Millisecond)
for i := 0; i < 10; i++ {
dealer.SendBytes([]byte("Hello World"), 0)
}
time.Sleep(300 * time.Millisecond)
count := receiveAll(router2)
fmt.Printf("Peer 1 offline: count=%d\n", count)
}