Я занимаюсь разработкой веб-службы, которая будет использоваться в качестве поставщика «база данных как услуга». Цель состоит в том, чтобы создать небольшой веб-сервис на основе flask, работающий на некоторых хостах, и «рабочие» процессы, работающие на разных хостах, принадлежащих разным командам. Всякий раз, когда член команды приходит и запрашивает новую базу данных, я должен создать ее на своем хосте. Теперь проблема ... Служба, которую я запускаю, должна быть запущена. Рабочий, однако, может быть перезапущен. Может случиться 5 минут может случиться 5 дней. Простой Popen не сработает, потому что он создаст дочерний процесс, и если работник остановится позже, процесс Popen будет уничтожен (я попробовал это).
У меня есть реализация, использующая многопроцессорную работу, которая работает как чемпион, к сожалению, я не могу использовать это с сельдереем. так не повезло там. Я пытался уйти от многопроцессорной библиотеки с помощью двойного разветвления и именованных каналов. Самый минимальный пример, который я мог бы произвести:
def launcher2(working_directory, cmd, *args):
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
with open(f'{working_directory}/ipc.fifo', 'wb') as wpid:
wpid.write(process.pid)
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
ipc = f'{working_directory}/ipc.fifo'
if os.path.exists(ipc):
os.remove(ipc)
os.mkfifo(ipc)
pid1 = os.fork()
if pid1 == 0:
os.setsid()
os.umask(0)
pid2 = os.fork()
if pid2 > 0:
sys.exit(0)
os.setsid()
os.umask(0)
launcher2(working_directory, cmd, *args)
else:
with os.fdopen(os.open(ipc, flags=os.O_NONBLOCK | os.O_RDONLY), 'rb') as ripc:
readers, _, _ = select.select([ripc], [], [], 15)
if not readers:
raise TimeoutError(60, 'Timed out', ipc)
reader = readers.pop()
pid = struct.unpack('I', reader.read())[0]
pid, status = os.waitpid(pid, 0)
print(status)
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
Мой сценарий использования более сложный, но я не думаю, что кто-то захочет прочитать 200+ строк начальной загрузки, но это не получается точно так же. С другой стороны, я не жду pid, если это не требуется, так что это все равно, что запустить процесс по запросу и позволить ему выполнить свою работу. Начальная загрузка базы данных занимает около минуты с полной настройкой, и я не хочу, чтобы клиенты стояли в течение минуты. Запрос приходит, я порождаю процесс и отправляю обратно идентификатор для экземпляра базы данных, и клиент может запросить статус на основе полученного идентификатора экземпляра. Однако с вышеупомянутым решением для разветвления я получаю:
[2020-01-20 18:03:17,760: INFO/MainProcess] Received task: Test[dbebc31c-7929-4b75-ae28-62d3f9810fd9]
[2020-01-20 18:03:20,859: ERROR/MainProcess] Process 'ForkPoolWorker-2' pid:16634 exited with 'signal 15 (SIGTERM)'
[2020-01-20 18:03:20,877: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).')
Traceback (most recent call last):
File "/home/pupsz/PycharmProjects/provider/venv37/lib/python3.7/site-packages/billiard/pool.py", line 1267, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).
Что заставляет меня задуматься о том, что может происходить. Я попытался выполнить еще более простую задачу:
@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
working_directory = '/var/tmp/workdir'
if not os.path.exists(working_directory):
os.makedirs(working_directory, mode=0o700)
command = [cmd]
command.extend(list(args))
process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return process.wait()
if __name__ == '__main__':
async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
print(async_result.get())
, которая снова завершается с той же ошибкой. Сейчас мне нравится сельдерей, но от этого мне кажется, что он не подходит для моих нужд. Я что-то испортил? Можно ли этого добиться, что мне нужно сделать от рабочего? Есть ли у меня какие-либо альтернативы или я просто должен написать свою очередь задач?