У меня есть скрипт Python 2.7, в котором существуют два обработчика журналов, один для файла и один для стандартного вывода. В сценарии точки входа я порождаю несколько процессов с помощью функции multiprocessing.Pool.apply_async. Затем в каждом отдельном процессе я запускаю concurrent.futures.ThreadPoolExecutor, чтобы создать несколько потоков для выполнения моей работы. Я пытался убедиться, что ничего не распределяется между процессами (кроме ведения журнала). Тем не менее, время от времени я получаю ValueError, «Операция ввода-вывода для закрытого файла». В трассировке это происходит при вызове sys.stdout.flush () при создании пула, как в Pool (5).
Важно отметить, что ошибка, которую я включил ниже, по-видимому, произошла в родительском процессе и даже до создания дочерних процессов.
Я реализовал библиотеку Я нашел синхронизировать записи в один файл. Похоже, это привело к исчезновению одной версии ValueError, но у меня осталась другая версия, которая кажется подозрительно связанной с stdout (как упомянуто выше).
Настройка ведения журнала:
MODULE_DIR = os.path.dirname(__file__)
LOG_FILE = os.path.join(MODULE_DIR, 'my_test.log')
LOG_FMT = logging.Formatter('%(asctime)s [%(levelname)8s]: %(message)s')
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
fh = logging.FileHandler(LOG_FILE)
fh.setFormatter(LOG_FMT)
fh.setLevel(logging.INFO)
LOG.addHandler(fh)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(LOG_FMT)
ch.setLevel(logging.INFO)
LOG.addHandler(ch)
Создание пула. Цикл while и обработчик сигнала были помещены туда, предложив еще одну запись о переполнении стека, чтобы позволить элементу control-c быть пойманным в родительском процессе и завершить пул.
oh = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(5)
signal.signal(signal.SIGINT, oh)
try:
for item in items:
pool.apply_async(process_item, (args, item), callback=cb)
pool.close()
sleep_count = 0
while ITEM_COUNT != len(items):
time.sleep(1)
sleep_count += 1
except (KeyboardInterrupt, SystemExit):
LOG.info('Terminating all processes because interrupt was received.')
pool.terminate()
Создание темы:
with ThreadPoolExecutor(max_workers=10) as pool:
for x in xes:
work = pool.submit(handle_x, args)
workers.append(work)
Traceback:
File “myscript.py", line 153, in create_item_workers
pool = Pool (5)
File "<redacted>/multiprocessing/_ init__.py”, line 232, in
p-fatan
return Pool(processes, initializer, initargs, maxtasksperchild)
File "<redacted>/multiprocessing/pool.py”, line 159, in in
Bh v4
“self._repopulate_pool ()
File "<redacted>/multiprocessing/pool.py”, line 223, in rep
opulate_pool
w.start()
File "<redacted>/multiprocessing/process.py”, line 130, ins
tart
self. popen = Popen(self)
File "<redacted>/multiprocessing/forking.py”, line 127, in _
_init__
sys.stdout. flush ()
File "<redacted>/StringIO.py”, line 256, in flush
_complain_ifclosed (self.closed)
File "<redacted>/StringIO.py”, line 40, in complain_ifclose
a
raise ValueError, "I/O operation on closed file”
ValueError: I/0 operation on closed file
Кто-нибудь знает, связано ли это с механизмами регистрации, или здесь что-то еще в корне не так?