Отказоустойчивое циклическое переключение ZeroMQ на отключенных узлах - PullRequest
0 голосов
/ 31 мая 2018

Я использую функцию множественного подключения ZeroMQ для подключения одного DEALER к 2 ROUTERS:

            +----> .Connect() --> ROUTER 1
           /
DEALER ---+------> .Connect() --> ROUTER 2

В моем тестеЯ отправляю 10 сообщений через DEALER.Я получаю хорошее равномерное распределение 5 сообщений для каждого из ROUTER -s.

Моя проблема в том, что если ROUTER 1 по какой-то причине пропадет, DEALER все равно продолжит помещать сообщения в очередь.для этого я думаю в предположении, что ROUTER 1 в конечном итоге вернется.В итоге я получаю только 5 сообщений на ROUTER 2.

. Мне нужно, чтобы DEALER игнорировал отключенных или неисправных пиров.Возможно ли это?

Я пытался установить ZMQ_SNDHWM и многие другие, но, похоже, ничего не работает.

Единственная альтернатива, которую я вижу, это выполнитьСам отказоустойчивый, с отдельными сокетами, пульсами, ACK-пакетами и т. д. Похоже, что такой базовый шаблон уже должен быть реализован ZeroMQ.


Редактировать : тестовый код

package main

import (
    "github.com/pebbe/zmq4"
    "time"
    "log"
    "fmt"
)

func receiveAll(sok *zmq4.Socket) (received int) {
    poller := zmq4.NewPoller()
    poller.Add(sok, zmq4.POLLIN)

    for {
        sockets, err := poller.Poll(100 * time.Millisecond)
        if err != nil {
            log.Print(err)
        }
        if len(sockets) > 0 {
            for _, s := range sockets {
                msg, _ := s.Socket.RecvMessageBytes(0)
                if string(msg[1]) != "Hello World" {
                    log.Fatalf("Unexpected message: %s\n", msg)
                }
                received ++
            }
        } else {
            return
        }
    }
}

func main() {

    dealer, _ := zmq4.NewSocket(zmq4.DEALER)
    router1, _ := zmq4.NewSocket(zmq4.ROUTER)
    router2, _ := zmq4.NewSocket(zmq4.ROUTER)

    router1.Bind("tcp://0.0.0.0:6667")
    router2.Bind("tcp://0.0.0.0:6668")

    dealer.Connect("tcp://0.0.0.0:6667")
    dealer.Connect("tcp://0.0.0.0:6668")

    router1.SetSubscribe("")
    router2.SetSubscribe("")
    dealer.SetSubscribe("")

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count1 := receiveAll(router1)
    count2 := receiveAll(router2)

    fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)

    // Shut down a peer
    router1.Close()
    time.Sleep(300 * time.Millisecond)

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count := receiveAll(router2)

    fmt.Printf("Peer 1 offline: count=%d\n", count)

}

1 Ответ

0 голосов
/ 31 мая 2018

Мне нужно, чтобы DEALER игнорировал отключенные или неисправные одноранговые узлы. Возможно ли это?

Да, конечно.Необходимо настроить значения по умолчанию (неактивные), используя настройки для конкретного случая использования в:

  • a .setsockopt( ZMQ.IMMEDIATE, 1 ), чтобы не буферизовать сообщение-экземпляры для сверстников, которые не кажутся «живыми»
  • a .setsockopt( ZMQ.HEARTBEAT_IVL, <ms> ) для отправки пульса
  • a .setsockopt( ZMQ.HEARTBEAT_TTL, <ms> ) для настройки времени жизни
  • a .setsockopt( ZMQ.HEARTBEAT_TIMEOUT, <ms>) для порога времени ожидания
  • a .setsockopt( ZMQ.HANDSHAKE_IVL, <ms> ) для управления (пере) установлением тайм-аутов.

Для получения подробной информации, проверьте привязку вашего языка и какую версию API на самом деле он используетпод капотом.Большинство этих настроек доступны начиная с native-API v 3.x, последняя документация по native-API v 4.2.2 поможет вам настроить значения и стратегии конфигурации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...