Привет, я ушел новичком в c многопоточности и обработки в python.
В моей текущей проблеме у меня есть 4 разных задачи, которые должны выполняться в бесконечном l oop.
Время выполнения этих задач составляет примерно
- T1 (зеленый): 0,4 с
- T2 (синий): ~ 0,6-0,8 с
- T3 (черный): ~ 0,6-0,8 с
- T4 (красный): 0,02 с
The latest resuts from T1 and T4 are input paramertes for T2 and T3.
T2/T3 should always start together when the results from T1 and T4 are there and the results from T2 and T3 should be merged to the endresult
Furthermote to reduce the runtime of T2 and T3 they always should run in parallel / on there own process.
pseudocode:
while true:
result4 = worker4().run().result()
if task1.done()
result1 = task1.result()
task1 = worker1().run()
if (result1 and result4) and (task2.stopped() and task3.stopped()):
task2 = worker2(result1).run()
task3 = worker3(result2).run()
if task2.done() and task3.done():
endresult = task2.result() + task3.result()
Я сделал несколько попыток решить проблему с помощью multiprocessing и ThreadPoolExecutor , но я не нашел хорошего решения для их запуска в бесконечном l oop.
Solution1 с ThreadPoolExecutor:
def worker1():
time.sleep(50)
return "result1"
def worker2(result):
time.sleep(80)
return "endresult" + result
def worker4():
return "result4"
result1, result2, thread1, thread2, thread3= None, None, None, None, None
with ThreadPoolExecutor(max_workers=4) as executor:
while True:
result4 = worker4()
if thread1 is None:
thread1 = executor.submit(worker1)
if thread1.done():
result1 = thread1.result()
if (thread2 is None and thread3 is None) and (result1 not None and result2 not None):
thread2 = executor.submit(worker3, result1)
thread3 = executor.submit(worker3, result2)
result1, result2 = None, None
if (thread2 is not None and thread3 is not None)
if thread2.done() and thread3.done():
frameDict = {}
result = [thread2.result(), thread3.result()]
thread2, thread3 = None, None
time.sleep(10)
Это решение вроде сработало для меня, но потоки 2 и 3 были медленными / не параллельными
Итак, я попробовал реализовать с многопроцессорной обработкой, но есть проблема, которую я не знаю, когда процесс завершен.
def worker1():
time.sleep(50)
return "result1"
def worker2(q, result):
time.sleep(80)
q.put( "endresult" + result)
def worker4(q):
q.put("result4")
result1, result4, thread1, thread2, thread3= None, None, None, None, None
X = 1 # When 0 Task 2 and 3 will run very slow
while True:
result4 = worker4()
if thread1 is None:
thread1 = multiprocessing.Process(target=worker1 , args=(q, ))
thread1.join(X)
if (thread2 is None and thread3 is None) and (thread1.is_alive() == False and result4 not None):
thread2 = multiprocessing.Process(target=worker2, args=(q, q.get()))
thread2.start()
thread3 = multiprocessing.Process(target=worker2, args=(q, result4))
thread3.start()
result1, result4 = None, None
thread2.join(X)
thread3.join(X)
if thread2 is not None and thread3 is not None:
if thread2.is_alive() == False and thread3.is_alive() == False:
endresult = [q.get(), q.get()]
time.sleep(10)
Примечания:
- Когда я использую
process.join(1)
, основной поток с конфликтующим рабочим будет приостановлен на короткое время, что должно не происходит. thread1.is_alive()
должно измениться на False после соединения, но этого не происходит. В качестве альтернативы я использовал q.qsize()
, чтобы проверить, есть ли результаты или нет.
Я был бы счастлив, если бы кто-нибудь мог предоставить мне руководство или базовый c пример того, как работать с этими процессами в бесконечном л oop.