ZeroMQ - связь сервера N-to-N для согласования согласования - PullRequest
0 голосов
/ 06 мая 2020

Я реализую алгоритм консенсуса ( raft ), в котором 5 серверов должны асинхронно взаимодействовать друг с другом для управления реплицированным журналом. Я использую пакеты python и multiprocessing для запуска 5 процессов, действующих как серверы, и ØMQ (pyzmq) для обработки связи между серверами.

Я разработал первый проект, и мне интересно несколько вещей:

У меня 5 серверов, и я жестко запрограммировал 5 «идентификаторов», с которыми настраиваю сокеты. Для каждого сервера я использую два ROUTER сокета:

  • первый привязывается к идентификатору сервера (этот сокет отвечает за получение других сообщений)
  • второй подключается к идентификаторам других серверов (этот отвечает за отправку сообщений)

Мне кажется странным использовать два ROUTER сокета на одном узле / сервере, но я не нахожу лучше: есть ли более элегантный способ действовать?

Я пытался использовать только один ROUTER сокет на сервер, то есть сокет, который устанавливает свой идентификатор, привязывается к данному порту и подключается к другим, но этот шаг «подключения» на каждой стороне не работает. t работает: в чем причина?

Вот простой пример с 3 серверами:

import time
from threading import Thread
from typing import List

import zmq

SEND_RULE = {
    '5555': '6666',
    '6666': '7777',
    '7777': '5555'
}

def worker(socket_port: str, peer_ports: List[str]):
    context = zmq.Context()

    receiving_socket = context.socket(zmq.ROUTER)
    receiving_socket.setsockopt(zmq.IDENTITY, socket_port.encode())
    receiving_socket.bind(f'tcp://*:{socket_port}')

    sending_socket = context.socket(zmq.ROUTER)
    sending_socket.setsockopt(zmq.IDENTITY, socket_port.encode())

    for peer_port in peer_ports:
        sending_socket.connect(f'tcp://localhost:{peer_port}')

    time.sleep(1)

    recipient_id = SEND_RULE[socket_port].encode()
    message_for_recipient = b'coucou!'

    print(socket_port, ' sending a message to ', recipient_id.decode())
    sending_socket.send_multipart([recipient_id, message_for_recipient])

    # Receive a message from peer
    sender_id, sender_message = receiving_socket.recv_multipart()

    print( socket_port,
         ' server received: ',
           sender_message.decode(),
         ' from: ',
           sender_id.decode()
           )


if __name__ == '__main__':
    socket_ports = ['5555', '6666', '7777']
    for socket_port in socket_ports:
        Thread( target = worker,
                args   = ( socket_port,
                           [ port for port in socket_ports
                                   if port != socket_ports ]
                           )
                ).start()

    time.sleep(3)

...