подпроцесс, порожденный из задачи Celery, никогда не завершается и proc.poll () == ни один статус не сохраняется постоянно - PullRequest
0 голосов
/ 19 февраля 2019

Я заметил эту проблему, поэтому я написал следующий простой демонстрационный код, и проблема все еще существует.

У меня есть файл worker.py, который эмулирует какой-то длительный процесс и сообщает о текущем прогрессе через стандартный вывод

from sys import stdout
import time


def main():

    to_print = ("Start\n").encode("utf-8")
    stdout.buffer.write(to_print)
    stdout.flush()

    for i in range(0, 11):
        time.sleep(1)
        to_print = ("progress:%d\n" % (i*10)).encode("utf-8")
        stdout.buffer.write(to_print)
        stdout.flush()


if __name__ == "__main__":
    main()

Я регистрирую задачу сельдерея.В этой задаче я использую подпроцесс для вызова worker.py

@celery.task(name="tasks.runner", bind=True)
def run_task(self):

    proc = subprocess.Popen(["python", "worker.py"], bufsize=1, stdout=subprocess.PIPE)

    run_task_pid = os.getpid()
    update_progress(self, proc, run_task_pid)
    print("I am at the last line of run_task")

def update_progress(self, proc, run_task_pid):
    while True:
        print('in the loop')
        line = proc.stdout.readline()

   # I first notice the problem from here, I found it never breaks 
   # proc.poll() is always None suggesting proc never be terminated

        if line == "" and proc.poll() is not None:
            break

        linestr = line.decode("utf-8")

        if "progress:" in linestr:
            progress = int(linestr.strip().split(":")[-1])
            self.update_state(state="PROGRESS", meta={"progress": progress,
                                                      "subprocess_pid": proc.pid,
                                                      "run_task_pid": run_task_pid,
                                                      "description": ""})
    # I use this line to break because proc.poll() not work here
            if progress == 100:
                self.update_state(state="SUCCESS", meta={ "progress": progress,
                                                          "subprocess_pid": proc.pid,
                                                          "run_task_pid": run_task_pid,
                                                          "description": ""})
                break

        time.sleep(0.5)

. Я запускаю задачу сельдерея с помощью flask, просто простой обратный вызов, и у меня есть небольшой веб-сервис, который сообщает мне идентификаторы процесса.В любом случае задача сельдерея начинает работать так:

def worker():
    task = send_task('tasks.runner')

Когда задача сельдерея уже завершена, из подтверждения выглядит неплохо, последняя строка кода в run_task выполнена и сама задача «выполнена», как говорит Селериme:

$ I am at the last line of run_task
$ Task tasks.runner[0a0ff60c-ab09-4279-91f8-f9dd5010a49c] succeeded in 11.078552599996328s: None

Но если я посмотрел на ps, subprocess.pid все еще там с командой as (python), это объясняет, почему proc.poll () всегда предполагает, что подпроцесс все еще работает ...

PID TTY           TIME CMD
5160 ttys002    0:00.00 (python)

Это похоже на процесс зомби, но не требует ресурсов.Это очень раздражает, потому что, даже если это фальшивое зомби, процесс подпроцесса не имеет чистого сигнала выхода (в коде я должен прямо сказать, что мы можем прерваться, когда прогресс == 100, в противном случае, как вы можете сказать, работа сельдерея на самом деле будет работатьнавсегда, потому что это никогда не ломается).И также я не уверен, есть ли какая-либо другая проблема, потому что подпроцесс действительно не завершается.

Интересно, знает ли кто-нибудь эту проблему?Известно ли, что Celery работает с подпроцессом, потому что он создает странную зависимость от дочернего процесса?Я обнаружил, что у меня почти нет возможности убить его вручную, единственный способ - остановить самого Сельдерея.Я сделал что-то неправильно?Спасибо за вашу помощь

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...