В экземпляре HyperOpt с высокой степенью параллелизации почему не работает subprocess.Popen / os.system с> и 2>? - PullRequest
0 голосов
/ 09 июня 2019

Мне нужно запустить HyperOpt очень параллельно (например, 200+ работников).С менее чем 200 работниками все работает отлично.Но когда запускается больше рабочих, я больше не могу получить Вывод какой-либо программы (которая мне нужна) из целевой функции.

Я уже попробовал subprocess.popen (), и он работал отлично, если не более200 рабочих.Поэтому я переключился с него на os.system (program + ">" stderr + "2>" stdout), а затем прочитал файлы stderrlog и stdoutlog, что также прекрасно работает, если в нем не более 200 рабочих.

Файл stderrlog работает отлично.Но stdoutlog большую часть времени остается пустым или получает только несколько случайных строк.Файлы существуют, однако, они только неполные.Похоже, что программы не завершаются, и в журналах нет никаких признаков того, что что-то идет не так, за исключением того, что HyperOpt не находит результаты.Опять же, это работает, если я делаю такие вещи, как 'echo "hello world" как код (очень короткие выводы).

Я выполняю это в SLURM-среде на кластере HPC, но нетошибки slurmнесколько тысяч строк кода (он снова вызывает скрипт python), который мне нужен для дальнейшего анализа, поэтому просто поиск РЕЗУЛЬТАТОВ в выводе этого не поможет.

Я также пробовал это какКод run_program (с дополнительным лог-файлом), который, согласно веб-сайту, который я больше не могу найти, утверждает, что это решит проблемы с размером буфера Popen, превышающим 64K:

def print_to_log(string, logfile):
    folder = os.path.dirname(logfile)
    pathlib.Path(folder).mkdir(parents=True, exist_ok=True)

    append_write = 'a'
    if not os.path.exists(logfile):
        append_write = 'a'
    else:
        append_write = 'w'

    logfilehandler = open(logfile, append_write)

    print(string, file=logfilehandler)

    logfilehandler.close()

import subprocess

def objective_function_mongodb(parameter):
    programconverted = []
    print_to_log("Before Conversion", logfile)
    for i in program.split():
        programconverted.append(str(i))
        print_to_log("In Conversion", logfile)

    print_to_log("Definition of MAX_OUTPUT_SIZE", logfile)
    MAX_OUTPUT_SIZE = 2 ** 64

    print_to_log("Starting Timer", logfile)
    start = time.time()
    print_to_log("Intializing Sockets", logfile)
    stdout = socket.socketpair()
    stderr = socket.socketpair()
    # nonblocking and timeout is not the same, timeout is easier to handle via socket.timeout exception
    print_to_log("Setting Timeouts", logfile)
    stdout[0].settimeout(0.01)
    stderr[0].settimeout(0.01)
    print_to_log("Begin popen", logfile)
    p = subprocess.Popen(programconverted, stdout=stdout[1], stderr=stderr[1], close_fds=True)
    print_to_log("Create empty out und err", logfile)
    out, err = "", ""
    print_to_log("Initizializing returncode", logfile)
    returncode = None
    print_to_log("Begin Loop", logfile)
    loopnr = 0

    while True:
        print_to_log("p.poll()", logfile)
        p.poll()

        print_to_log("Get stdout/stderr", logfile)
        try:
            outtmp = stdout[0].recv(4096).decode('utf-8')
        except socket.timeout as exc:
            outtmp = ""

        try:
            errtmp = stderr[0].recv(4096).decode('utf-8')
        except socket.timeout as exc:
            errtmp = ""

        print_to_log("Adding (out/err)tmp onto (out/err), loopnr: " + str(loopnr), logfile)
        out += str(outtmp)
        err += str(errtmp)

        if len(out) > MAX_OUTPUT_SIZE or \
           len(err) > MAX_OUTPUT_SIZE:
            print_to_log("Killing process because it's output is bigger than MAX_OUTPUT_SIZE", logfile)
            p.kill()
            p.wait()
            out = out[:MAX_OUTPUT_SIZE]
            err = err[:MAX_OUTPUT_SIZE]
            out += "Output was truncated to " + str(MAX_OUTPUT_SIZE)

        if p.returncode != None:
            print_to_log("Returncode: " + str(p.returncode), logfile)
            returncode = p.returncode
            break
        time.sleep(0.1)
        loopnr = loopnr + 1
    end = time.time()
    # we now have: returncode, out, err, start, end

    stderr[0].close()
    stdout[1].close()

    out = str(out)
    err = str(err)

    array = {
        "stdout": out,
        "stderr": err,
        "retcode": returncode
    }



    return array

Вdebug-logfiles (печатается print_to_log) только одна случайная строка «Добавление (out / err) tmp на (out / err)», loopnr: «+ str (loopnr)» со случайным номером строки, но ничего больше (ничего дои ничего после этого.) программаоднако, согласно журналам, его никогда не убивают.

Файлы stderr работают, но файлы stdout в основном либо пусты, либо показывают только случайные строки из выходных данных, если параллельно работают более 200 рабочих.

Я исключил это, чтобы написать stdout отлично, особенно при использовании> и 2>.

ulimit -a показывает мне

ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 256025
max locked memory       (kbytes, -l) unlimited
max memory size         (kbytes, -m) 307200
open files                      (-n) 1048576
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) unlimited
cpu time               (seconds, -t) unlimited
max user processes              (-u) 4096
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

1 Ответ

0 голосов
/ 09 июня 2019

Вы, вероятно, работаете на пределе ОС; с тремя файловыми дескрипторами на процесс, 200 подпроцессов требуют 600 открытых файловых дескрипторов. Некоторые ОС устанавливают этот предел в несколько десятков; но, возможно, вы можете увеличить предел с помощью ulimit.

в сторону; не обижайся, но твое run_process - довольно грубое переосмысление subprocess.run().

...