Запись многопроцессных и многопоточных скриптов Python в стандартный вывод и один файл - PullRequest
0 голосов
/ 26 июня 2019

У меня есть скрипт Python 2.7, в котором существуют два обработчика журналов, один для файла и один для стандартного вывода. В сценарии точки входа я порождаю несколько процессов с помощью функции multiprocessing.Pool.apply_async. Затем в каждом отдельном процессе я запускаю concurrent.futures.ThreadPoolExecutor, чтобы создать несколько потоков для выполнения моей работы. Я пытался убедиться, что ничего не распределяется между процессами (кроме ведения журнала). Тем не менее, время от времени я получаю ValueError, «Операция ввода-вывода для закрытого файла». В трассировке это происходит при вызове sys.stdout.flush () при создании пула, как в Pool (5).

Важно отметить, что ошибка, которую я включил ниже, по-видимому, произошла в родительском процессе и даже до создания дочерних процессов.

Я реализовал библиотеку Я нашел синхронизировать записи в один файл. Похоже, это привело к исчезновению одной версии ValueError, но у меня осталась другая версия, которая кажется подозрительно связанной с stdout (как упомянуто выше).

Настройка ведения журнала:

MODULE_DIR = os.path.dirname(__file__)
LOG_FILE = os.path.join(MODULE_DIR, 'my_test.log')
LOG_FMT = logging.Formatter('%(asctime)s [%(levelname)8s]: %(message)s')
LOG = logging.getLogger()
LOG.setLevel(logging.INFO)
fh = logging.FileHandler(LOG_FILE)
fh.setFormatter(LOG_FMT)
fh.setLevel(logging.INFO)
LOG.addHandler(fh)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(LOG_FMT)
ch.setLevel(logging.INFO)
LOG.addHandler(ch)

Создание пула. Цикл while и обработчик сигнала были помещены туда, предложив еще одну запись о переполнении стека, чтобы позволить элементу control-c быть пойманным в родительском процессе и завершить пул.

oh = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(5)
signal.signal(signal.SIGINT, oh)
try:
    for item in items:
        pool.apply_async(process_item, (args, item), callback=cb)
    pool.close()
    sleep_count = 0
    while ITEM_COUNT != len(items):
        time.sleep(1)
        sleep_count += 1
except (KeyboardInterrupt, SystemExit):
    LOG.info('Terminating all processes because interrupt was received.')
    pool.terminate()

Создание темы:

with ThreadPoolExecutor(max_workers=10) as pool:
    for x in xes:
        work = pool.submit(handle_x, args)
        workers.append(work)

Traceback:

File “myscript.py", line 153, in create_item_workers
pool = Pool (5)
File "<redacted>/multiprocessing/_ init__.py”, line 232, in
p-fatan
return Pool(processes, initializer, initargs, maxtasksperchild)
File "<redacted>/multiprocessing/pool.py”, line 159, in in
Bh v4
“self._repopulate_pool ()
File "<redacted>/multiprocessing/pool.py”, line 223, in rep
opulate_pool
w.start()
File "<redacted>/multiprocessing/process.py”, line 130, ins
tart
self. popen = Popen(self)
File "<redacted>/multiprocessing/forking.py”, line 127, in _
_init__
sys.stdout. flush ()
File "<redacted>/StringIO.py”, line 256, in flush
_complain_ifclosed (self.closed)
File "<redacted>/StringIO.py”, line 40, in complain_ifclose
a
raise ValueError, "I/O operation on closed file”
ValueError: I/0 operation on closed file

Кто-нибудь знает, связано ли это с механизмами регистрации, или здесь что-то еще в корне не так?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...