Я заметил эту проблему, поэтому я написал следующий простой демонстрационный код, и проблема все еще существует.
У меня есть файл worker.py, который эмулирует какой-то длительный процесс и сообщает о текущем прогрессе через стандартный вывод
from sys import stdout
import time
def main():
to_print = ("Start\n").encode("utf-8")
stdout.buffer.write(to_print)
stdout.flush()
for i in range(0, 11):
time.sleep(1)
to_print = ("progress:%d\n" % (i*10)).encode("utf-8")
stdout.buffer.write(to_print)
stdout.flush()
if __name__ == "__main__":
main()
Я регистрирую задачу сельдерея.В этой задаче я использую подпроцесс для вызова worker.py
@celery.task(name="tasks.runner", bind=True)
def run_task(self):
proc = subprocess.Popen(["python", "worker.py"], bufsize=1, stdout=subprocess.PIPE)
run_task_pid = os.getpid()
update_progress(self, proc, run_task_pid)
print("I am at the last line of run_task")
def update_progress(self, proc, run_task_pid):
while True:
print('in the loop')
line = proc.stdout.readline()
# I first notice the problem from here, I found it never breaks
# proc.poll() is always None suggesting proc never be terminated
if line == "" and proc.poll() is not None:
break
linestr = line.decode("utf-8")
if "progress:" in linestr:
progress = int(linestr.strip().split(":")[-1])
self.update_state(state="PROGRESS", meta={"progress": progress,
"subprocess_pid": proc.pid,
"run_task_pid": run_task_pid,
"description": ""})
# I use this line to break because proc.poll() not work here
if progress == 100:
self.update_state(state="SUCCESS", meta={ "progress": progress,
"subprocess_pid": proc.pid,
"run_task_pid": run_task_pid,
"description": ""})
break
time.sleep(0.5)
. Я запускаю задачу сельдерея с помощью flask, просто простой обратный вызов, и у меня есть небольшой веб-сервис, который сообщает мне идентификаторы процесса.В любом случае задача сельдерея начинает работать так:
def worker():
task = send_task('tasks.runner')
Когда задача сельдерея уже завершена, из подтверждения выглядит неплохо, последняя строка кода в run_task выполнена и сама задача «выполнена», как говорит Селериme:
$ I am at the last line of run_task
$ Task tasks.runner[0a0ff60c-ab09-4279-91f8-f9dd5010a49c] succeeded in 11.078552599996328s: None
Но если я посмотрел на ps, subprocess.pid все еще там с командой as (python), это объясняет, почему proc.poll () всегда предполагает, что подпроцесс все еще работает ...
PID TTY TIME CMD
5160 ttys002 0:00.00 (python)
Это похоже на процесс зомби, но не требует ресурсов.Это очень раздражает, потому что, даже если это фальшивое зомби, процесс подпроцесса не имеет чистого сигнала выхода (в коде я должен прямо сказать, что мы можем прерваться, когда прогресс == 100, в противном случае, как вы можете сказать, работа сельдерея на самом деле будет работатьнавсегда, потому что это никогда не ломается).И также я не уверен, есть ли какая-либо другая проблема, потому что подпроцесс действительно не завершается.
Интересно, знает ли кто-нибудь эту проблему?Известно ли, что Celery работает с подпроцессом, потому что он создает странную зависимость от дочернего процесса?Я обнаружил, что у меня почти нет возможности убить его вручную, единственный способ - остановить самого Сельдерея.Я сделал что-то неправильно?Спасибо за вашу помощь