Создать группу стримеров и издателей - PullRequest
6 голосов
/ 14 марта 2019

У меня есть несколько узлов, которые будут использовать вторичный сервис, чтобы получать информацию об адресе друг друга.Я хочу иметь возможность публиковать информацию, чтобы все остальные узлы могли ее слышать.Использование сокета XPUB - это не вариант, который я хотел бы использовать здесь, так как я хочу, чтобы эта система распространялась.

То, что я пробовал, это то, что суммирует до:

1 Создание сокета PUB,

def pub_stream(self):
     self.pub = self.context.socket(zmq.PUB)
     self.pub.bind(self.endpoint)

2 Создание потока SUB,

def sub_stream(self):
    ioloop = IOLoop.instance()
    socket = self.context.socket(zmq.SUB)
    self.sub_stream = ZMQStream(socket, ioloop)
    self.sub_stream.on_recv(self.on_message)
    self.subs_stream.setsockopt(zmq.SUBSCRIBE, self.topic)

3 В какой-то момент получить адреса всех других узлов и подключиться к ним,

# close and restart sub_stream to get rid of any previous connections
for endpoint in endpoints:
    self.sub_stream.connect(endpoint)

Хотя сообщения обратного вызова on_message не передаются.Правильно ли то, что я делаю, если нет, то как лучше сделать то, чего я хочу достичь?

1 Ответ

0 голосов
/ 18 марта 2019

Какой бы маршрут вы ни выбрали, вам нужен хотя бы один фиксированный адрес для подключения, если у вас нет доступа к многоадресной рассылке и т. Д.

В качестве отдельной сети обнаружения я бы использовал простой прокси-сервер X (pub / sub), позволяющий новым узлам решать, какие другие узлы представляют интерес на основе предмета.

Простая версия

  • Создание одного прокси XSUB / XPUB с фиксированным адресом на каждой стороне (DNS и т. Д.)
  • Узел запускается
    • подключается к порту прокси-сервера XSUB и передает его тему, адрес и порт данных
    • подключается к порту XPUB и подписывается на узлы, представляющие интерес
      • Получив информацию о соединении, он соединяет свой сокет данных с сокетом данных узла на основе информации о соединении.

Надежная версия

  • Добавление нескольких прокси-серверов обнаружения с балансировщиком нагрузки / виртуальными IP-адресами для обеспечения отказоустойчивости и т. Д.
  • Узел должен отправить сообщение об обнаружении по таймеру.
    • Разрешить соединению поздних узлов
    • Разрешить подключенным узлам обнаруживать сбойные узлы (кроме случаев ожидания tcp)

Гибридная версия

В реальности / производстве я использую zeromq вместе с любыми другими (более подходящими) услугами. Таким образом, я не пытаюсь заново изобрести колесо (для обнаружения), а просто ноль для работы с подпиской / данными.

Например, если я распространяю свои системы по многим регионам с помощью облачного провайдера, почему бы не использовать предоставляемые ими службы обнаружения.

AWS (как один гибридный пример)

Для AWS я использую следующие компоненты, чтобы разрешить обнаружение / надежность / отработка отказа

  • Route53 (DNS) с проверками работоспособности
  • Облачная карта (DNS ish)
  • Упругие балансировщики нагрузки (интерфейс для любых пользовательских служб обнаружения)
...