Я пытаюсь использовать ZeroMQ для многопроцессорной обработки. Я хочу передавать файлы из tar-файла, поэтому я использовал стример.
Ниже приведен пример того, что хотите сделать.
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
def server(frontend_port, number_of_workers):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:%d" % frontend_port)
for i in range(0,10):
socket.send_json('#%s' % i)
for i in range(number_of_workers):
socket.send_json('STOP')
return True
def worker(work_num, backend_port):
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://127.0.0.1:%d" % backend_port)
while True:
message = socket.recv_json()
if message == 'STOP':
break
print("Worker #%s got message! %s" % (work_num, message))
time.sleep(1)
def main():
frontend_port = 7559
backend_port = 7560
number_of_workers = 2
streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
streamerdevice.start()
processes = []
for work_num in range(number_of_workers):
w = Process(target=worker, args=(work_num,backend_port))
processes.append(w)
w.start()
time.sleep(1)
s = Process(target=server, args=(frontend_port,number_of_workers))
s.start()
# server(frontend_port)
s.join()
for w in processes:
w.join()
if __name__ == '__main__':
main()
Этот код работает правильно. Но я хочу использовать send_multipart()
для отправки кортежа или списка, включающего элементы разных типов, таких как [string, numpy_array, integer]
, но json не может обрабатывать массивы numpy
. Я избегаю использования рассола, потому что мне нужно, чтобы он был как можно быстрее. Я пытался преобразовать массив в байты, но это не сработало. (возможно я делал это неправильно, я не уверен).
Я ценю, если вы можете предоставить рабочий фрагмент кода.
В идеале я хочу сделать что-то вроде этого:
socket.send_multipart([string, numpy_array, integer])
Итак, я хочу знать, как наиболее эффективно это сделать.
Я использую Python 3,6