Я хотел проверить zeroMQ как альтернативу обмену данными между несколькими процессами. Я попытался настроить очень простой шаблон PUB-SUB и передать массивы от производителя к процессу потребителя (функции recv и send взяты из https://pyzmq.readthedocs.io/en/latest/serialization.html#using -your-own-serialization ). Однако потребитель работает намного медленнее, чем производитель.
Я уже немного отладил и обнаружил, что потребитель ждет JSON в recv_array
. Если я отправлю только JSON без данных, я получу ожидаемую высокую пропускную способность.
- Что я могу сделать, чтобы повысить производительность при отправке данных?
- Это правильный подход вообще?
import logging
import time
from multiprocessing import Process
import numpy as np
import zmq
logging.basicConfig(format='[%(asctime)s.%(msecs)03d] %(levelname)s '
'[PID:%(process)d %(name)s]: %(message)s',
datefmt='%Y/%m/%d-%H:%M:%S',
level=logging.INFO)
IMG_RESOLUTION = (1280, 1024, 3)
ADDRESS = "tcp://127.0.0.1:5557"
def send_array(socket, frame_number, data_array, flags=0, copy=True, track=False):
"""send a numpy array with metadata"""
md = {
'frame_number': frame_number,
'dtype': str(data_array.dtype),
'shape': data_array.shape,
}
socket.send_json(md, flags | zmq.SNDMORE)
socket.send(data_array, flags, copy=copy, track=track)
def recv_array(socket, flags=0, copy=True, track=False):
"""recv a numpy array"""
md = socket.recv_json(flags=flags) # <------------------------------------ Here consumer waits
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = memoryview(msg)
data_array = np.frombuffer(buf, dtype=md['dtype']).reshape(md['shape'])
return md['frame_number'], data_array
def producer():
log = logging.getLogger('Producer')
context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind(ADDRESS)
log.info(f'Created socket on {ADDRESS}.')
for frame_no in range(100):
data = np.random.randint(0, 255, IMG_RESOLUTION, dtype='uint')
log.info(f'Sending frame {frame_no:05}.')
send_array(zmq_socket, frame_no, data)
def consumer():
log = logging.getLogger('Consumer')
context = zmq.Context(io_threads=1)
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect(ADDRESS)
zmq_socket.setsockopt(zmq.SUBSCRIBE, "".encode('ascii'))
log.info(f'Created socket on {ADDRESS}.')
while True:
frame_no, data = recv_array(zmq_socket)
log.info(f'Received frame {frame_no:05}.')
if __name__ == '__main__':
consumer_proc = Process(target=consumer, daemon=True)
consumer_proc.start()
producer_proc = Process(target=producer, daemon=True)
producer_proc.start()
for i in range(5):
time.sleep(1)