У меня есть фид ZMQ и планировщик из пакета Schedule, который я пытаюсь вызывать в разных циклах while.
Фид zmq будет работать в течение 3 часов, после чего мой планировщик из пакета Schedule запустит мою функцию обработкикоторый корректно останавливает процесс ZMQ (в настоящее время использует многопроцессорность), выполняет фактическую обработку данных, а затем перезапускает процесс в конце функции.
Причина, по которой я это делаю в настоящее время, заключается в том, что я считаю, что проблема вмногопоточность непрерывно добавляет данные фида zmq к фрейму данных во время фактической обработки данных.
Так что это приводит меня к использованию мультиобработки.Я пробовал просто создавать процессы из функций, затем запускать их и вызывать join
process = Process(target = run, args = ('bob',))
process1 = Process(target = schedule1, args = 'bob',)
process.start()
process1.start()
process.join()
process1.join()
и
with Pool(2) as p:
p.map(process, "1")
p.map(process1, "1")
run и расписание 1 - мои функции
with Pool(2) as p:
p.map(schedule1, "1")
p.map(run, "1")
Только что попробовал это выше, и теперь он просто застрял, ничего не печатая из моих циклов while, сообщая мне, что он активен.Так что не уверен, что здесь происходит
и
pool = Pool()
result1 = pool.apply_async(schedule, '1')
result2 = pool.apply_async(run, '1')
Они все не работают и не запускают ни одного из моих двух циклов while.Хотя многопоточность может, по крайней мере, сделать это.Причина, по которой я перешел на многопроцессорность, заключалась в том, что я могу легко и просто остановить и начать обработку с помощью .terminate (), затем .join (), а затем перезапустить с помощью .start () .join ()
Любая помощь будет полезна или любые указатели на то, где я ошибаюсь.
Аргументы во всех функциях являются фиктивными аргументами, встроенными для .map, потому что вам нужно передавать аргументы.