Как мне войти при использовании многопроцессорной обработки в Python? - PullRequest
199 голосов
/ 13 марта 2009

Прямо сейчас у меня есть центральный модуль в каркасе, который порождает несколько процессов, используя Python 2.6 multiprocessing module . Поскольку он использует multiprocessing, существует журнал, поддерживающий многопроцессорность на уровне модуля, LOG = multiprocessing.get_logger(). Согласно документам , этот регистратор имеет блокировку, разделяемую процессами, чтобы вы не искажали вещи в sys.stderr (или любом другом дескрипторе файла), когда в него одновременно записываются несколько процессов.

Проблема, с которой я столкнулся сейчас, заключается в том, что другие модули в платформе не поддерживают многопроцессорность Как мне кажется, мне нужно сделать так, чтобы все зависимости этого центрального модуля использовали многопроцессорное ведение журнала. Это раздражает внутри платформы, не говоря уже о всех клиентах платформы. Есть ли альтернативы, о которых я не думаю?

Ответы [ 19 ]

3 голосов
/ 13 марта 2009

просто опубликуйте где-нибудь свой экземпляр регистратора. таким образом, другие модули и клиенты могут использовать ваш API для получения регистратора без необходимости import multiprocessing.

2 голосов
/ 13 марта 2014

Как насчет делегирования всего ведения журнала другому процессу, который считывает все записи журнала из очереди?

LOG_QUEUE = multiprocessing.JoinableQueue()

class CentralLogger(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.log = logger.getLogger('some_config')
        self.log.info("Started Central Logging process")

    def run(self):
        while True:
            log_level, message = self.queue.get()
            if log_level is None:
                self.log.info("Shutting down Central Logging process")
                break
            else:
                self.log.log(log_level, message)

central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()

Просто поделитесь LOG_QUEUE с помощью любого из многопроцессорных механизмов или даже наследования, и все это прекрасно работает!

1 голос
/ 13 сентября 2016

Вот мой простой взлом / обходной путь ... не самый полный, но легко модифицируемый и более простой для чтения и понимания, я думаю, чем любые другие ответы, которые я нашел до написания этого:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
1 голос
/ 01 июня 2016

Ниже приведен класс, который можно использовать в среде Windows, требуется ActivePython. Вы также можете наследовать другие обработчики журналов (StreamHandler и т. Д.)

class SyncronizedFileHandler(logging.FileHandler):
    MUTEX_NAME = 'logging_mutex'

    def __init__(self , *args , **kwargs):

        self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
        return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)

    def emit(self, *args , **kwargs):
        try:
            win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
            ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
        finally:
            win32event.ReleaseMutex(self.mutex)
        return ret

А вот пример, демонстрирующий использование:

import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool

def f(i):
    time.sleep(random.randint(0,10) * 0.1)
    ch = random.choice(letters)
    logging.info( ch * 30)


def init_logging():
    '''
    initilize the loggers
    '''
    formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    file_handler = SyncronizedFileHandler(sys.argv[1])
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

#must be called in the parent and in every worker process
init_logging() 

if __name__ == '__main__':
    #multiprocessing stuff
    pool = Pool(processes=10)
    imap_result = pool.imap(f , range(30))
    for i , _ in enumerate(imap_result):
        pass
1 голос
/ 05 августа 2010

У меня есть решение, похожее на Ironhacker, за исключением того, что я использую logging.exception в своем коде и обнаружил, что мне нужно отформатировать исключение перед его передачей через очередь, так как трассировки не могут быть засолены:

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
0 голосов
/ 07 апреля 2018

Есть этот замечательный пакет

Пакет: https://pypi.python.org/pypi/multiprocessing-logging/

Код: https://github.com/jruere/multiprocessing-logging

Установка:

pip install multiprocessing-logging

Затем добавьте:

import multiprocessing_logging

# This enables logs inside process
multiprocessing_logging.install_mp_handler()
0 голосов
/ 07 февраля 2018

Моим детям, которые десятилетиями сталкивались с той же проблемой и нашли этот вопрос на этом сайте, я оставляю этот ответ.

Простота против усложнения. Просто используйте другие инструменты. Python потрясающий, но он не был предназначен для некоторых вещей.

Следующий фрагмент для logrotate демон работает для меня и не слишком усложняет вещи. Запланируйте его запуск по часам и

/var/log/mylogfile.log {
    size 1
    copytruncate
    create
    rotate 10
    missingok
    postrotate
        timeext=`date -d '1 hour ago' "+%Y-%m-%d_%H"`
        mv /var/log/mylogfile.log.1 /var/log/mylogfile-$timeext.log
    endscript
}

Вот как я его устанавливаю (символические ссылки не работают для logrotate):

sudo cp /directpath/config/logrotate/myconfigname /etc/logrotate.d/myconfigname
sudo cp /etc/cron.daily/logrotate /etc/cron.hourly/logrotate
0 голосов
/ 13 марта 2009

Одна из альтернатив - записать протокол многопроцессорной обработки в известный файл и зарегистрировать обработчик atexit, чтобы присоединиться к этим процессам и прочитать его обратно на stderr; тем не менее, вы не получите поток в реальном времени к выводимым сообщениям на stderr таким образом.

0 голосов
/ 26 марта 2015

Если у вас есть взаимоблокировки, возникающие в комбинации блокировок, потоков и вилок в модуле logging, о чем сообщается в отчете об ошибке 6721 (см. Также связанный вопрос SO ) .

Здесь выложено небольшое исправление здесь .

Однако это только исправит любые потенциальные тупики в logging. Это не исправит, что вещи могут быть искажены. Смотрите другие ответы, представленные здесь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...