Новичок в ZMQ.Я реализовал сеть PUB-SUB с прокси-сервером, с идеей подключить несколько веб-сокетов в качестве PUB к моему прокси-серверу в агрегированный канал (SUB).Я использовал протокол inproc, так как все это происходит в одном и том же процессе.
Я написал ниже.Я получаю 400-500 FPS.Это слишком медленно.
import time
import random
import threading
import zmq
# Channels from SENDERS (PUB) to PROXY
channels = ["inproc://first", "inproc://second"]
# Channel from PROXY to RECEIVER
outbound = "inproc://to_aggregate"
numrec = 0
ctx = zmq.Context.instance()
lock = threading.Lock()
def sender(context, address):
socket = context.socket(zmq.PUB)
socket.connect(address)
while True:
twait = random.randint(1,3)
tosend = f'{twait} from {threading.current_thread()}'.encode()
socket.send(tosend)
def receiver(context):
global numrec
socket = context.socket(zmq.SUB)
socket.connect(outbound)
topicfilter = '' # As string, encoded to bytes later on
socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
while True:
resp = socket.recv()
with lock:
# TOOK OUT A PRINT STATEMENT, WAS SLOWING DOWN THE LOOP
numrec += 1
def middleman(context):
data_in = context.socket(zmq.XSUB)
[data_in.bind(channel) for channel in channels]
data_out = context.socket(zmq.XPUB)
data_out.bind(outbound)
zmq.proxy(data_in, data_out)
exchpub = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[0]), daemon=True)
exchpub2 = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[1]), daemon=True)
exchsub = threading.Thread(target=receiver, name='THE_TOPLEVEL_SUB', args=(ctx,), daemon=True)
proxy = threading.Thread(target=middleman, name='THE_PROXY', args=(ctx,), daemon=True)
threadlist = [exchpub, exchpub2, exchsub, proxy]
[i.start() for i in threadlist]
secwait = 5
t=tzero=time.time()
while t-tzero < secwait:
t = time.time()
with lock:
print(f'exited here and {numrec/secwait} FPS')
Вот мой главный вопрос:
Почему это так медленно?
Последующие вопросы:
1) ZMQ docs state: "INPROC: сервер должен выполнить привязку, прежде чем любой клиент выполнит соединение ".однако независимо от порядка инициализации сбоев не наблюдается.Почему?
2) Использование send_multipart и recv_multipart замедляет мой код.(Отправка итерируемого с len == 2 замедляет его примерно вдвое). Почему я хотел бы использовать это?Учитывая небольшую разницу в скорости, я хотел бы использовать ее следующим образом: (SOURCE, PAYLOAD, TIMESTAMP).
3) Как бы вы оценили скорость одного такого кода?Предложения по реализации?
Спасибо.