Моя цель - вычислить несколько точек данных параллельно и асинхронно записать их на устройство ввода-вывода. Точки данных не могут быть записаны сами по себе, но должны быть обернуты; Точнее, я пишу массив JSON, и символы []
вокруг структур требуются.
Мой текущий подход заключается в использовании multiprocessing.Pool
и apply_async для вычисления точек данных. Используя функцию обратного вызова, точки данных затем отправляются на multiprocessing.Queue
, в то время как отдельный поток синхронно извлекает элементы из очереди и записывает их на устройство ввода-вывода.
Может случиться так, что мне нужно отменить процесс с помощью SIGINT. В этом случае я хочу, чтобы вычисление завершилось благополучно, то есть остановило вычисление всех процессов Pool
, но завершило запись всех оставшихся элементов в очереди и символа ]
.
Пока мне не удалось найти рабочее решение моей проблемы. В текущем состоянии мои две проблемы:
- Иногда процесс не завершается.
SIGINT_handler
вызывается, но процесс не завершается. Я не могу проверить, откуда это происходит, но я предполагаю, что это может быть тупик в очереди? Я не знаю, как это предотвратить.
- Насколько я понимаю,
pool.terminate()
отправляет SIGTERM всем дочерним процессам. Это, очевидно, вызывает исключение KeyboardInterrupt в каждом из них, загромождая терминал дюжиной следов стека.
Мой код можно найти ниже.
# Initialize the worker pool and necessary variables.
pool = multiprocessing.Pool(os.cpu_count() - 1)
data_queue = multiprocessing.Queue()
counter_lock = threading.Lock()
threads_todo = args.autnum
# This function is executed after each successful experiment.
def apply_finished(data):
data_queue.put(data)
with counter_lock:
nonlocal threads_todo
threads_todo -= 1
# Start the pool.
for i in range(args.autnum):
pool.apply_async(collect_data, (args,), callback=apply_finished)
pool.close()
# This function is called if SIGINT is send to this process.
def SIGINT_handler(sig, frame):
sys.stderr.write("SIGINT received. Cancelling...")
sys.stderr.flush()
pool.terminate()
with counter_lock:
nonlocal threads_todo
threads_todo = 0
signal.signal(signal.SIGINT, SIGINT_handler)
# Write the data to stdout until all workers terminate or a SIGINT is received.
sys.stdout.write("[\n")
while threads_todo > 0 or not data_queue.empty():
try:
data = data_queue.get(True, 1)
s = data.decode('utf-8')
sys.stdout.write(s)
sys.stdout.flush()
except queue.Empty:
data = None
sys.stdout.write("]")