Как мне войти при использовании многопроцессорной обработки в Python? - PullRequest
199 голосов
/ 13 марта 2009

Прямо сейчас у меня есть центральный модуль в каркасе, который порождает несколько процессов, используя Python 2.6 multiprocessing module . Поскольку он использует multiprocessing, существует журнал, поддерживающий многопроцессорность на уровне модуля, LOG = multiprocessing.get_logger(). Согласно документам , этот регистратор имеет блокировку, разделяемую процессами, чтобы вы не искажали вещи в sys.stderr (или любом другом дескрипторе файла), когда в него одновременно записываются несколько процессов.

Проблема, с которой я столкнулся сейчас, заключается в том, что другие модули в платформе не поддерживают многопроцессорность Как мне кажется, мне нужно сделать так, чтобы все зависимости этого центрального модуля использовали многопроцессорное ведение журнала. Это раздражает внутри платформы, не говоря уже о всех клиентах платформы. Есть ли альтернативы, о которых я не думаю?

Ответы [ 19 ]

106 голосов
/ 21 мая 2009

Я только что написал собственный обработчик журнала, который просто передает все в родительский процесс через канал. Я тестировал его всего десять минут, но, похоже, он работает довольно хорошо.

( Примечание: Это жестко закодировано в RotatingFileHandler, что является моим собственным вариантом использования.)


Обновление: @javier теперь поддерживает этот подход как пакет, доступный в Pypi - см. multiprocessing-logging в Pypi, github в https://github.com/jruere/multiprocessing-logging


Обновление: реализация!

Теперь это использует очередь для правильной обработки параллелизма, а также корректно восстанавливается после ошибок. Сейчас я использую это в производстве в течение нескольких месяцев, и текущая версия ниже работает без проблем.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
57 голосов
/ 13 марта 2009

Единственный способ справиться с этим ненавязчиво - это:

  1. Создает каждый рабочий процесс таким образом, чтобы его журнал отправлялся в другой файловый дескриптор (на диск или в канал). В идеале все записи журнала должны иметь метки времени.
  2. Ваш процесс контроллера может затем сделать один из следующих действий:
    • При использовании файлов на диске: Объединить файлы журналов в конце цикла, отсортированные по отметке времени
    • При использовании труб (рекомендуется): Объединять записи журнала на лету из всех труб в центральный файл журнала. (Например, периодически select из файловых дескрипторов каналов, выполните сортировку слиянием доступных записей журнала и выполните сброс в централизованный журнал. Повторите.)
19 голосов
/ 18 августа 2015

В кулинарной книге регистрации Python есть два полных примера: https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

Используется QueueHandler, который является новым в Python 3.2, но его легко скопировать в собственный код (как я делал это в Python 2.7) из: https://gist.github.com/vsajip/591589

Каждый процесс помещает свою запись в Queue, а затем поток или процесс listener (для каждого из них предоставляется один пример) выбирает их и записывает их все в файл - без риска повреждения или искажения.

19 голосов
/ 13 марта 2009

Еще одной альтернативой могут быть различные не-файловые обработчики журналирования в пакете logging :

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(и другие)

Таким образом, вы могли бы легко иметь демон регистрации, куда бы вы могли писать безопасно и правильно обрабатывать результаты. (Например, простой сервер сокетов, который просто распаковывает сообщение и отправляет его в свой собственный обработчик вращающихся файлов.)

SyslogHandler позаботится об этом и для вас. Конечно, вы можете использовать свой собственный экземпляр syslog, а не системный.

13 голосов
/ 23 января 2016

Ниже приведено еще одно решение с упором на простоту для всех, кто (как и я), кто пришел сюда из Google. Регистрация должна быть легкой! Только для 3.2 или выше.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
13 голосов
/ 15 июля 2010

Вариант других, в котором ведение журнала и потока очереди разделены.

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
10 голосов
/ 16 октября 2013

Все текущие решения слишком связаны с конфигурацией регистрации с использованием обработчика. Мое решение имеет следующую архитектуру и функции:

  • Вы можете использовать любую конфигурацию регистрации, которую хотите
  • Ведение журнала выполняется в потоке демона
  • Безопасное отключение демона с помощью диспетчера контекста
  • Связь с журналом ведется с помощью multiprocessing.Queue
  • В подпроцессах logging.Logger (и уже определенные экземпляры) исправлены для отправки всех записей в очередь
  • Новый : отформатировать трассировку и сообщение перед отправкой в ​​очередь, чтобы предотвратить ошибки посадки

Код с примером использования и выходом можно найти в следующей Gist: https://gist.github.com/schlamar/7003737

7 голосов
/ 19 октября 2016

Поскольку мы можем представить многопроцессорное ведение журнала как много издателей и одного подписчика (слушателя), использование ZeroMQ для реализации обмена сообщениями PUB-SUB действительно является опцией.

Кроме того, модуль PyZMQ , привязки Python для ZMQ, реализует PUBHandler , который является объектом для публикации сообщений регистрации через сокет zmq.PUB.

В сети существует решение для централизованного ведения журналов из распределенного приложения с использованием PyZMQ и PUBHandler, которое может быть легко адаптировано для локальной работы с несколькими процессами публикации.

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir,                 "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
6 голосов
/ 18 июня 2009

Мне также нравится ответ zzzeek, ​​но Андре прав, что для предотвращения искажений требуется очередь. Мне немного повезло с трубкой, но я увидел искаженный звук, что несколько ожидаемо. Реализовать его оказалось сложнее, чем я думал, особенно из-за работы в Windows, где есть некоторые дополнительные ограничения для глобальных переменных и прочего (см .: Как реализована многопроцессорная обработка Python в Windows? )

Но, наконец, я заработал. Этот пример, вероятно, не идеален, поэтому комментарии и предложения приветствуются. Он также не поддерживает настройку форматера или чего-либо другого, кроме корневого регистратора. По сути, вам необходимо переустановить регистратор в каждом из процессов пула с очередью и настроить другие атрибуты в регистраторе.

Опять же, любые предложения о том, как улучшить код, приветствуются. Я, конечно, еще не знаю всех трюков с Python: -)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
3 голосов
/ 06 июня 2009

Мне понравился ответ zzzeek. Я бы просто заменил канал на очередь, поскольку, если несколько потоков / процессов используют один и тот же конец канала для создания сообщений журнала, они будут искажены.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...