У меня есть проект python3 (3.4.5), над которым я работаю, который использует multiprocessing.Pool
для выполнения более 50 заданий через 4 рабочих. У меня есть отдельная настройка процесса с logging.handlers.QueueListener
, поэтому я могу записывать глобальные данные в один файл с помощью Queue
, используемого с multiprocessing.Manager()
. Таким образом, в основном поток выглядит так
- Основная программа запускается
- Создать
Queue
через multiprocessing.Manager()
- Начните выделенный процесс регистрации с
QueueListener
прослушивания Queue
, который я только что создал для глобального журнала. (Я также попробовал это, просто используя поток из основной программы с теми же результатами.)
- Создайте
multiprocessing.Pool
для обработки отдельных заданий, передавая им Queue
, созданный ранее, и необходимую информацию о конфигурации для запуска и настройки их ведения журнала (есть глобальный журнал, а также отдельный журнал для каждого задания с более детальной информацией). ). Задания начинаются с map_async
.
- Подождите, пока все задания будут обработаны, затем выполните некоторые последние шаги и выполните очистку.
Я получаю неустойчивую ошибку на некоторых заданиях, хотя обычно на 1 задании возникает ошибка (каждый раз разная), иногда встречаются 2 одинаковые ошибки или ноль. Насколько я могу судить, не код в заданиях вызывает ошибку, а что-то в настройках multiprocessing
или logging
. Вот пример ошибки, которую я получаю:
--- Logging error ---
Traceback (most recent call last):
File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
self.enqueue(self.prepare(record))
File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
self.queue.put_nowait(record)
File "<string>", line 2, in put_nowait
File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
self._send(chunk)
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
File "./sampling__test__py.py", line 100, in <module>
run_pool = multiprocessing.Pool(4)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
return Popen(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
self._launch(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
code = process_obj._bootstrap()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/home/username/value_check.py", line 338, in value_check
global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None
Ошибка в коде относится к объекту протоколирования в моем коде, но даже когда я помещаю try/except
логику вокруг вызова, он ничего не делает, ошибка, кажется, происходит вверх по течению. Я также попытался изменить то, что регистрируется, от форматированной строки до простой строки, но безрезультатно. Похоже, что где-то по пути отдельные задания либо теряют связь с Queue
, либо что-то в Queue
выходит из строя и вызывает проблемы.
Есть идеи? Я работал над тем, чтобы получить более новую версию Python, которая была бы полезна по ряду причин (в частности, f-строк), но я не знаю, решит ли это эту проблему, и у меня заканчивается идеи устранения неполадок.