Сеть PUB-SUB с прокси: как улучшить количество кадров в секунду - PullRequest
0 голосов
/ 11 февраля 2019

Новичок в ZMQ.Я реализовал сеть PUB-SUB с прокси-сервером, с идеей подключить несколько веб-сокетов в качестве PUB к моему прокси-серверу в агрегированный канал (SUB).Я использовал протокол inproc, так как все это происходит в одном и том же процессе.

Я написал ниже.Я получаю 400-500 FPS.Это слишком медленно.

import time
import random
import threading
import zmq

# Channels from SENDERS (PUB) to PROXY
channels = ["inproc://first", "inproc://second"]

# Channel from PROXY to RECEIVER
outbound = "inproc://to_aggregate"

numrec = 0
ctx = zmq.Context.instance()

lock = threading.Lock()

def sender(context, address):
    socket = context.socket(zmq.PUB)
    socket.connect(address)
    while True:
        twait = random.randint(1,3)
        tosend = f'{twait} from {threading.current_thread()}'.encode()
        socket.send(tosend)

def receiver(context):
    global numrec
    socket = context.socket(zmq.SUB)
    socket.connect(outbound)
    topicfilter = ''  # As string, encoded to bytes later on
    socket.setsockopt(zmq.SUBSCRIBE, topicfilter.encode())
    while True:
        resp = socket.recv()
        with lock:
            # TOOK OUT A PRINT STATEMENT, WAS SLOWING DOWN THE LOOP
            numrec += 1

def middleman(context):
    data_in = context.socket(zmq.XSUB)
    [data_in.bind(channel) for channel in channels]
    data_out = context.socket(zmq.XPUB)
    data_out.bind(outbound)
    zmq.proxy(data_in, data_out)

exchpub = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[0]), daemon=True)
exchpub2 = threading.Thread(target=sender, name='THE_TOPLEVEL_PUBLISHER', args=(ctx, channels[1]), daemon=True)
exchsub = threading.Thread(target=receiver, name='THE_TOPLEVEL_SUB', args=(ctx,), daemon=True)
proxy = threading.Thread(target=middleman, name='THE_PROXY', args=(ctx,), daemon=True)

threadlist = [exchpub, exchpub2, exchsub, proxy]

[i.start() for i in threadlist]


secwait = 5

t=tzero=time.time()

while t-tzero < secwait: 
    t = time.time()

with lock:
    print(f'exited here and {numrec/secwait} FPS')

Вот мой главный вопрос:

Почему это так медленно?

Последующие вопросы:

1) ZMQ docs state: "INPROC: сервер должен выполнить привязку, прежде чем любой клиент выполнит соединение ".однако независимо от порядка инициализации сбоев не наблюдается.Почему?

2) Использование send_multipart и recv_multipart замедляет мой код.(Отправка итерируемого с len == 2 замедляет его примерно вдвое). Почему я хотел бы использовать это?Учитывая небольшую разницу в скорости, я хотел бы использовать ее следующим образом: (SOURCE, PAYLOAD, TIMESTAMP).

3) Как бы вы оценили скорость одного такого кода?Предложения по реализации?

Спасибо.

...