Я впервые серьезно играю с параллельными вычислениями.
Я использую модуль multiprocessing
в Python и сталкиваюсь с этой проблемой:
Потребитель очереди запускается в другом процессе, нежели производитель очереди, первый должен дождаться, пока последний завершит свою работу, прежде чем прекратить итерации по очереди. Иногда потребитель быстрее, чем производитель, и очередь остается пустой.
Если я не поставлю условие, программа не остановится.
В примере кода я использую шаблон PRODUCER_IS_OVER
для примера того, что мне нужно.
следующий код набросок проблемы:
def save_data(save_que, file_):
### Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
### Queue consumer
while not(empty and PRODUCER_IS_OVER):
try:
data = save_que.get()
print("saving data",data)
except:
empty = save_que.empty()
print(empty)
pass
#PRODUCER_IS_OVER = get_condition()
print ("All data saved")
return
def get_condition():
###NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1,5))
data = random.randint(1,10)
print("sending data", data)
save_que.put(data)
### Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target= save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
produce_data
занимает переменное время, и я хочу, чтобы процесс save_p запускался ПЕРЕД заполнением очереди, чтобы использовать очередь, пока она заполнена.
Я думаю, что есть обходной путь, чтобы сообщить, когда прекратить итерацию, но я хочу знать, существует ли правильный способ сделать это.
Я пробовал и мультипроцессорную обработку .Pipe и .Lock, но я не знаю, как реализовать правильно и эффективно.
решено: это лучший способ?
следующий код реализует STOPMESSAGE в Q, работает нормально, я могу уточнить его с помощью класса QMsg
, если язык поддерживает только статические типы.
def save_data(save_que, file_):
# Coroutine instantiation
PRODUCER_IS_OVER = False
empty = False
# Queue consumer
while not(empty and PRODUCER_IS_OVER):
data = save_que.get()
empty = save_que.empty()
print("saving data", data)
if data == "STOP":
PRODUCER_IS_OVER = True
print("All data saved")
return
def get_condition():
# NameError: global name 'PRODUCER_IS_OVER' is not defined
if PRODUCER_IS_OVER:
return True
else:
return False
def produce_data(save_que):
for _ in range(5):
time.sleep(random.randint(1, 5))
data = random.randint(1, 10)
print("sending data", data)
save_que.put(data)
save_que.put("STOP")
# Main function here
import random
import time
from multiprocessing import Queue, Manager, Process
manager = Manager()
save_que = manager.Queue()
file_ = "file"
save_p = Process(target=save_data, args=(save_que, file_))
save_p.start()
PRODUCER_IS_OVER = False
produce_data(save_que)
PRODUCER_IS_OVER = True
save_p.join()
Но это не может работать, если очередь создается несколькими отдельными процессами: кто будет отправлять сообщение ALT в этом случае?
другое решение - сохранить индекс процессов в списке и выполнить:
def some_alive():
for p in processes:
if p.is_alive():
return True
return False
Но multiprocessing
поддерживает метод .is_alive
только в родительском процессе, что является ограничением в моем случае.
спасибо