ожидание выполнения подпроцесса в пуле многопроцессорности - PullRequest
2 голосов
/ 15 марта 2012

Я запускаю пул рабочих и добавляю задания в этот пул.Каждый процесс создает подпроцесс с помощью браузера, ожидает загрузки страницы и затем делает снимок экрана.Иногда Opera показывает диалог сбоя для некорректно завершенного сеанса.Чтобы избежать этого, я убиваю вкладку через xkill и жду завершения работы браузера.Теперь мне нужно правильно обработать сигнал SIGTERM.После того, как сигнал был установлен и обработан в функции sig_handler, я запрещаю отправлять новые задания с помощью pool.close () и ожидать завершения пула с помощью pool.join ().Если в пуле не запущены какие-либо подпроцессы, основной процесс обычно завершается, но когда в пуле есть подпроцесс, все рабочие процессы завершаются без ожидания завершения работы браузера.Как я могу нормально прекратить мой основной процесс?

#!/usr/bin/env python

#
# http://bugs.python.org/issue6766 in functions manager data packed by pickle
#

import redis
import pickle
import getopt
import time
import logging
import os
import sys
import pwd
import subprocess
import re
import urllib2
import signal
import multiprocessing
import httplib

# Define regexps
xvfb_reg = re.compile(r'Xvfb :(\d+)')
browser_reg = re.compile(r'0x(\d+) .* \("opera" "Opera"\)  1024x768') 
running = True 

def sig_handler(signum, frame):
    """
        Set termination flag
    """
    global running 
    running = False 
    return

def check_url_code(url):
    """
        Try fetch url before processing.
        Return True if returned request code is 200 OK else False
    """
    try:
        url = urllib2.urlopen(url)
        code = url.getcode()

        if code == 200:
            return True
        else:
            return False
    except (urllib2.URLError, httplib.InvalidURL, ValueError):
        return False

def list_display():
    """
        Get working virtual framebuffers
    """
    proc = subprocess.Popen(['/bin/ps', 'ax'], stdout=subprocess.PIPE)
    return xvfb_reg.findall(proc.communicate()[0])

def get_display(queue, lock):
    """
        Get display for opera instance.
    """

    while True:
        lock.acquire()
        _queue = pickle.loads(queue['q'])
        free = list(set(_queue['displays']).difference(_queue['locked_displays']))
        if len(free):
            _queue['locked_displays'].append(free[0])
            queue['q'] = pickle.dumps(_queue)
            lock.release()

            return free[0]
        lock.release()
    time.sleep(3)

def get_screenshot(data, display):
    """
        Fork background opera process and then search window with url.
        Wait for 30 seconds and take screenshot of the window.
        xkill killing opera window, cuz without opened tabs opera will be terminated.
    """
    try:
        os.remove('.opera/{0}/sessions/autosave.win'.format(display))
    except:
        pass
    proc = subprocess.Popen(['/usr/bin/opera', '-geometry', '1024x768+0+0', '-fullscreen', '-display', ':{0}'.format(display), '-pd', '.opera/{0}'.format(display), data['url']])
    time.sleep(10)

    if int(data['size']) == 120:
        geometry = '120x90'
    elif int(data['size']) == 240:
        geometry = '240x151'
    elif int(data['size']) == 400:
        geometry = '400x300'

    try:
        os.makedirs(data['path'])
    except OSError:
        pass


    xwin_proc = subprocess.Popen(['/usr/bin/xwininfo', '-display', ':{0}'.format(display), '-root', '-tree'], stdout=subprocess.PIPE)
    xwin_info = xwin_proc.communicate()[0]
    window = browser_reg.findall(xwin_info)[0]

    time.sleep(5)

    pimport = subprocess.Popen(['/usr/bin/import', '-display', ':{0}'.format(display), '-window', 'root', '-resize', geometry, data['file']], stdout=subprocess.PIPE)
    pimport.wait()

    logging.info('Screenshot {0} for {1}: display={2}, window=0x{3}, file={4}'.format(geometry, data['url'], display, window, data['file']))

    pxkill = subprocess.Popen(['/usr/bin/xkill', '-display', ':{0}'.format(display), '-id', '0x{0}'.format(window)])
    proc.wait()


def worker_process(data, display, lock, connection, queue):
    """
        Return data for callback function for freeing display and url
    """
    get_screenshot(data, display)

    lock.acquire() 
    _queue = pickle.loads(queue['q'])
    _queue['locked_displays'].remove(display)
    queue['q'] = pickle.dumps(_queue)
    lock.release()
    connection.hdel('jobs', data['md5_url'])
    connection.hincrby('stats', 'completed', 1)

    return 


def main(pool, queue, lock, connection, job):
    """
        Checking for file has been created early in another queue, url and url locks
    """
    data = pickle.loads(job)

    if os.path.isfile(data['path']):
        connection.hdel('jobs', data['md5_url'])
        return

    lock.acquire()
    _queue = pickle.loads(queue['q'])

    if not check_url_code(data['url']): 
        logging.error('Error fetching {0}'.format(data['url']))
        lock.release()
        connection.hdel('jobs', data['md5_url'])
        return
    lock.release()

    display = get_display(queue, lock)
    pool.apply_async(worker_process, args = (data, display, lock, connection, queue))

def create_daemon(home):
    try:
        pid = os.fork()
    except OSError:
        sys.exit('Can not demonize process')

    if pid == 0:
        os.setsid()

        try:
            pid = os.fork()
        except OSError:
            sys.exit('Can not demonize process')

        if pid == 0:
            os.chdir(home)
            os.umask(0)
        else:
            os._exit(0)
    else:
        os._exit(0)

    import resource
    maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if (maxfd == resource.RLIM_INFINITY):
        maxfd = 1024 
    for fd in range(0, maxfd):
        try:
            os.close(fd)
        except OSError:
            pass

    if hasattr(os, 'devnull'):
        console = os.devnull
    else:
        console = '/dev/null'
    os.open(console, os.O_RDWR)

    os.dup2(0, 1)
    os.dup2(0, 2)

    return (0)

def help():
    print """
Usage: {0} --u screenshot -l /var/log/screenshot/server.log -p /var/run/screenshot.pid

    --user   Set unprivileged user for process. This user can't be nobody, because script
    -u       reads home directory from passwd and uses it for Chrome user data dirs.

   --log     Set log file.
   -l

   --pid     Set pid file.
   -p

   --help    This help.
   -h
    """.format(sys.argv[0])

if __name__ == '__main__':
    log_file = '/var/log/screenshot/server.log'
    pid_file = '/var/run/screenshot.pid'
    user = None
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'l:p:u:h', ['log', 'pid', 'user', 'help'])
    except getopt.GetoptError:
        help()
        sys.exit(2)

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            help()
            sys.exit() 
        elif opt in ('-l', '--log'):
            log_file = arg
        elif opt in ('-p', '--pid'):
            pid_file = arg
        elif opt in ('-u', '--user'):
            user = arg

    if user:
        if not os.geteuid() == 0:
            sys.exit('You need root privileges to set user')
        try:
            userl = pwd.getpwnam(user)
            uid = userl.pw_uid
            home = userl.pw_dir
        except KeyError:
            sys.exit('User {0} does not exist'.format(user))

        os.setuid(uid)
        os.chdir(home)
    else:
        sys.exit('You must set user')

    # Fork child process for demonization
    retval = create_daemon(home)

    # Write pid to pidfile 
    pid = os.getpid()
    open(pid_file, 'w').write(str(pid))

    # Open logfile
    logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename=log_file)

    logging.info('Starting server with pid {0}'.format(os.getpid()))
    #
    # Get working displays and start subprocesses
    displays = list_display() 
    logging.info('Found displays: {0}'.format(' '.join(displays)))    

    pool = multiprocessing.Pool(processes=len(displays))
    queue = multiprocessing.Manager().dict()
    queue['q'] = pickle.dumps({
                    'displays' : displays, 
                    'termination' : False,
                    'locked_displays' : []})
    lock = multiprocessing.Manager().Lock()
    connection = redis.Redis('localhost')

    # Handle termination signals
    signal.signal(signal.SIGTERM, sig_handler)

    while running:
        job = connection.lpop('high_priority')  
        if job is None:
            job = connection.rpop('low_priority')
        if not job is None:
            main(pool, queue, lock, connection, job)
        else:
            time.sleep(5)

    logging.info('Server stopped') 
    pool.close()
    pool.join()
    os._exit(0)
...