Ошибка регистрации многопроцессорной обработки Python - TypeError: требуется целое число (получил тип NoneType) - PullRequest
0 голосов
/ 14 января 2019

У меня есть проект python3 (3.4.5), над которым я работаю, который использует multiprocessing.Pool для выполнения более 50 заданий через 4 рабочих. У меня есть отдельная настройка процесса с logging.handlers.QueueListener, поэтому я могу записывать глобальные данные в один файл с помощью Queue, используемого с multiprocessing.Manager(). Таким образом, в основном поток выглядит так

  1. Основная программа запускается
  2. Создать Queue через multiprocessing.Manager()
  3. Начните выделенный процесс регистрации с QueueListener прослушивания Queue, который я только что создал для глобального журнала. (Я также попробовал это, просто используя поток из основной программы с теми же результатами.)
  4. Создайте multiprocessing.Pool для обработки отдельных заданий, передавая им Queue, созданный ранее, и необходимую информацию о конфигурации для запуска и настройки их ведения журнала (есть глобальный журнал, а также отдельный журнал для каждого задания с более детальной информацией). ). Задания начинаются с map_async.
  5. Подождите, пока все задания будут обработаны, затем выполните некоторые последние шаги и выполните очистку.

Я получаю неустойчивую ошибку на некоторых заданиях, хотя обычно на 1 задании возникает ошибка (каждый раз разная), иногда встречаются 2 одинаковые ошибки или ноль. Насколько я могу судить, не код в заданиях вызывает ошибку, а что-то в настройках multiprocessing или logging. Вот пример ошибки, которую я получаю:

--- Logging error ---
Traceback (most recent call last):
  File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
    self.enqueue(self.prepare(record))
  File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
    self.queue.put_nowait(record)
  File "<string>", line 2, in put_nowait
  File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
    self._send(chunk)
  File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
    n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
  File "./sampling__test__py.py", line 100, in <module>
    run_pool     = multiprocessing.Pool(4)
  File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
    context=self.get_context())
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
    self._repopulate_pool()
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
    w.start()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
    return Popen(process_obj)
  File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
    self._launch(process_obj)
  File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
    code = process_obj._bootstrap()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "/home/username/value_check.py", line 338, in value_check
    global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None

Ошибка в коде относится к объекту протоколирования в моем коде, но даже когда я помещаю try/except логику вокруг вызова, он ничего не делает, ошибка, кажется, происходит вверх по течению. Я также попытался изменить то, что регистрируется, от форматированной строки до простой строки, но безрезультатно. Похоже, что где-то по пути отдельные задания либо теряют связь с Queue, либо что-то в Queue выходит из строя и вызывает проблемы.

Есть идеи? Я работал над тем, чтобы получить более новую версию Python, которая была бы полезна по ряду причин (в частности, f-строк), но я не знаю, решит ли это эту проблему, и у меня заканчивается идеи устранения неполадок.

1 Ответ

0 голосов
/ 14 января 2019

Даже когда я помещаю попытку / кроме логики в вызов, он ничего не делает.

Вероятно, потому что, если пакет журналирования обнаружит исключение, связанное с самой регистрацией, он напечатает трассировку, но не вызовет само исключение. Это более подробно объясняется в строке документации для logging.Handler.handleError.

Один из способов начать с установки:

logging.raiseExceptions = True

Если атрибут levelExceptions уровня модуля имеет значение False, исключения автоматически игнорируются.

Если это не поможет, вы можете ввести в код код вызова import pdb; pdb.set_trace() для .emit(); что-то вроде:

def emit(self, record):
    try:
        msg = self.format(record)
        stream = self.stream
        stream.write(msg)
        stream.write(self.terminator)
        self.flush()
    except Exception as e:
        import pdb; pdb.set_trace()  # < ---
        self.handleError(record)

Где record будет LogRecord экземпляром. Обычно, когда я вижу ошибку регистрации, это потому, что я использовал неправильное количество аргументов для данной строки формата, но проверяю, что объект record должен, надеюсь, сказать вам больше.

Наконец, из стека вызовов есть и сам журнал вызовов:

global_logger.info(
    "SplitTime: {str_timeDelta} -- "
    "COMPLETED: {Check_Name} --- "
    "Total Txn Count: {var_Total_Txn_Count} --- "
    "Criteria Txn Count: {var_Criteria_Txn_Count} --- "
    "Threshold: {Threshold} --- "
    "Low_Vol Threshold: {LowVolThresh}".format(
    str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))

Трудно точно сказать, что в конечном итоге вызывает исключение, поскольку строка выглядит полностью отформатированной. (Хотя мы не видим YAML_Config.)

Независимо от этого, одна рекомендация: вы можете воспользоваться «ленивым» форматированием строк в журнале, а не str.format(), как в настоящее время. Вызов str.format() будет оценен, как только он сможет, тогда как если вы передадите kwargs в global_logger.info(), пакет журналирования будет ожидать их оценки до тех пор, пока он не должен.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...