python уведомление о многопроцессорном пуле при обновлении рабочего - PullRequest
1 голос
/ 15 января 2020

Я использую Python 2,7 multiprocessing.Pool для управления пулом из 3 рабочих. Каждый работник довольно сложен, и в стороннем коде есть утечка ресурсов (предположительно), которая вызывает проблемы после 6-8 часов непрерывной работы. Поэтому я хотел бы использовать maxtasksperchild, чтобы периодически обновлять работников.

Я также хотел бы, чтобы каждый работник записывал в свой отдельный файл журнала. Без maxtasksperchild я использую общий multiprocessing.Value, чтобы назначить целое число (0, 1 или 2) для каждого работника, затем использую целое число для имени файла журнала.

С maxtasksperchild Я бы хотел бы повторно использовать файлы журнала, как только работник сделан. Поэтому, если все это работает в течение месяца, мне нужны только три файла журнала, а не один файл журнала для каждого порожденного работника.

Если бы я мог передать обратный вызов (например, от finalizer до go вместе с initializer, поддерживаемым в настоящее время), это будет просто. Без этого я не вижу надежного и простого способа сделать это.

Ответы [ 2 ]

1 голос
/ 16 января 2020

Это AFAIK недокументированный, но multiprocessing имеет класс Finalizer, "который поддерживает финализацию объекта с использованием слабых ссылок". Вы можете использовать его для регистрации финализатора в вашем initializer.

Я не вижу multiprocessing.Value полезного выбора синхронизации в этом случае. Несколько рабочих могут выйти одновременно, сигнализируя, какие целые числа файлов свободны, - это больше, чем может обеспечить (заблокированный) счетчик.

Я бы предложил использовать несколько голых multiprocessing.Lock с, по одному для каждого файла, вместо:

from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize


def f(n):
    global fileno
    for _ in range(int(n)):  # xrange for Python 2
        pass
    return fileno


def init_fileno(file_locks):
    for i, lock in enumerate(file_locks):
        if lock.acquire(False):  # non-blocking attempt
            globals()['fileno'] = i
            print("{} using fileno: {}".format(current_process().name, i))
            Finalize(lock, lock.release, exitpriority=15)
            break


if __name__ == '__main__':

    n_proc = 3
    file_locks = [Lock() for _ in range(n_proc)]

    pool = Pool(
        n_proc, initializer=init_fileno, initargs=(file_locks,),
        maxtasksperchild=2
    )

    print(pool.map(func=f, iterable=[50e6] * 18))
    pool.close()
    pool.join()
    # all locks should be available if all finalizers did run
    assert all(lock.acquire(False) for lock in file_locks)

Вывод:

ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]

Process finished with exit code 0

Обратите внимание, что с Python 3 Вы не можете надежно использовать менеджер контекста пула вместо старого способа сделать это, показанного выше. Менеджер контекста пула (к сожалению) вызывает terminate(), что может привести к остановке рабочих процессов до того, как финализатор сможет запустить.

0 голосов
/ 16 января 2020

Я закончил со следующим. Предполагается, что идентификаторы PID перерабатываются не очень быстро (верно для Ubuntu для меня, но не для Unix). Я не думаю, что он делает какие-то другие предположения, но на самом деле я просто интересуюсь Ubuntu, поэтому я не смотрел на другие платформы, такие как Windows.

Код использует массив для отслеживания из которых PID заявили, какой индекс. Затем, когда запускается новый работник, он проверяет, не используются ли какие-либо PID. Если он находит его, он предполагает, что это потому, что работник завершил свою работу (или был уволен по другой причине). Если его не найти, нам не повезло! Так что это не идеально, но я думаю, что это проще, чем все, что я видел до сих пор или рассматривал.

def run_pool():
    child_pids = Array('i', 3)
    pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)

def init_worker(child_pids):
    with child_pids.get_lock():
        available_index = None
        for index, pid in enumerate(child_pids):
            # PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
            # which are no longer in use. We also reclaim the lucky case where a PID was recycled
            # but assigned to one of our workers again, so we know we can take it over
            if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
                available_index = index
                break

        if available_index is not None:
            child_pids[available_index] = os.getpid()
        else:
            # This is unexpected - it means all of the PIDs are in use so we have a logical error
            # or a PID was recycled before we could notice and reclaim its index
            pass

def _is_pid_in_use(pid):
    try:
        os.kill(pid, 0)
        return True
    except OSError:
        return False
...