pyzmq PUB-SUB - саб получает только медленно - PullRequest
0 голосов
/ 10 июля 2020

Я хотел проверить zeroMQ как альтернативу обмену данными между несколькими процессами. Я попытался настроить очень простой шаблон PUB-SUB и передать массивы от производителя к процессу потребителя (функции recv и send взяты из https://pyzmq.readthedocs.io/en/latest/serialization.html#using -your-own-serialization ). Однако потребитель работает намного медленнее, чем производитель.

Я уже немного отладил и обнаружил, что потребитель ждет JSON в recv_array. Если я отправлю только JSON без данных, я получу ожидаемую высокую пропускную способность.

  1. Что я могу сделать, чтобы повысить производительность при отправке данных?
  2. Это правильный подход вообще?
import logging
import time
from multiprocessing import Process

import numpy as np
import zmq

logging.basicConfig(format='[%(asctime)s.%(msecs)03d] %(levelname)s '
                           '[PID:%(process)d %(name)s]: %(message)s',
                    datefmt='%Y/%m/%d-%H:%M:%S',
                    level=logging.INFO)

IMG_RESOLUTION = (1280, 1024, 3)
ADDRESS = "tcp://127.0.0.1:5557"


def send_array(socket, frame_number, data_array, flags=0, copy=True, track=False):
    """send a numpy array with metadata"""
    md = {
        'frame_number': frame_number,
        'dtype': str(data_array.dtype),
        'shape': data_array.shape,
    }

    socket.send_json(md, flags | zmq.SNDMORE)
    socket.send(data_array, flags, copy=copy, track=track)


def recv_array(socket, flags=0, copy=True, track=False):
    """recv a numpy array"""
    md = socket.recv_json(flags=flags)  # <------------------------------------ Here consumer waits
    msg = socket.recv(flags=flags, copy=copy, track=track)
    buf = memoryview(msg)
    data_array = np.frombuffer(buf, dtype=md['dtype']).reshape(md['shape'])
    return md['frame_number'], data_array


def producer():
    log = logging.getLogger('Producer')

    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUB)
    zmq_socket.bind(ADDRESS)

    log.info(f'Created socket on {ADDRESS}.')
    for frame_no in range(100):
        data = np.random.randint(0, 255, IMG_RESOLUTION, dtype='uint')
        log.info(f'Sending frame {frame_no:05}.')
        send_array(zmq_socket, frame_no, data)


def consumer():
    log = logging.getLogger('Consumer')

    context = zmq.Context(io_threads=1)
    zmq_socket = context.socket(zmq.SUB)
    zmq_socket.connect(ADDRESS)
    zmq_socket.setsockopt(zmq.SUBSCRIBE, "".encode('ascii'))

    log.info(f'Created socket on {ADDRESS}.')
    while True:
        frame_no, data = recv_array(zmq_socket)
        log.info(f'Received frame {frame_no:05}.')


if __name__ == '__main__':
    consumer_proc = Process(target=consumer, daemon=True)
    consumer_proc.start()
    producer_proc = Process(target=producer, daemon=True)
    producer_proc.start()

    for i in range(5):
        time.sleep(1)

...