Очистить запущенный дочерний процесс, запущенный из Flask MethodView API - PullRequest
1 голос
/ 09 октября 2019

Я создаю API-интерфейс Flask MethodView . Для конкретной конечной точки я использую данные запроса для запуска потенциально длительной команды. Вместо того, чтобы ждать завершения команды, я обертываю ее в multiprocessing.Process, вызываю start, а затем возвращаю пользователю HTTP 202 вместе с URL-адресом, который они могут использовать для мониторинга состояния процесса.

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202

Похоже, процессы, запущенные в методе post, становятся зомби после их завершения, но я не могу понять, как правильно отслеживать и очищать их, не блокируя выполнениеродительского метода . Я попытался реализовать поток мониторинга, как это было предложено в . Присоединение Python к процессу без блокировки родительского элемента . Насколько я понимаю, он предлагает запустить отдельный поток, который отслеживает очередь FIFO, а затем поместить дескриптор процесса в очередь перед возвратом родительской функции. Я попробовал реализацию (ниже), но похоже, что вы не можете передать объект процесса в поток, поскольку он содержит защищенный атрибут AuthenticationString.

Traceback (most recent call last):
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
|     obj = _ForkingPickler.dumps(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
|     cls(buf, protocol).dump(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/process.py", line 291, in __reduce__
 |     'Pickling an AuthenticationString object is '
| TypeError: Pickling an AuthenticationString object is disallowed for security reasons

Это моя реализация Python присоединяется к процессу, не блокируя родителя . Я понятия не имею, будет ли это работать, потому что вышеупомянутая ошибка выключает всю систему с самого начала. Будем очень благодарны за любые мысли или предложения о том, как я могу ответственно запускать эти процессы, не блокируя вызывающий метод.

from threading import Thread
from multiprocessing import Queue, ...

class Joiner(Thread):

    def __init__(self, q):
        super().__init__()
        self.__q = q

    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""
        self._jobs = Queue()            
        self._babysitter = Joiner(self._jobs)
        self._babysitter.start()

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()
        self._jobs.put(p)

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202

1 Ответ

1 голос
/ 09 октября 2019

Вы были так близко :) Все выглядит просто отлично, кроме одной вещи: вы используете multiprocessing.Queue для хранения запущенных процессов, чтобы позже присоединить их к экземпляру Joiner. Из документов вы узнаете следующее

Примечание. Когда объект помещается в очередь, объект обрабатывается и фоновый поток позже сбрасывает обработанные данные внижележащий канал.

То есть процесс сериализуется при помещении в очередь, что приводит к следующей ошибке

TypeError: выбор объекта AuthenticationString запрещен по соображениям безопасности

Это происходит из-за уникального ключа аутентификации , который есть у каждого процесса. Этот ключ является байтовой строкой, которую можно рассматривать как пароль типа multiprocessing.process.AuthenticationString, и его невозможно выбрать.

Решение простое, просто используйте экземпляр queue.Queue для хранения ваших долго выполняющихся процессов. ,Вот рабочий пример:

#!/usr/bin/env python3
import os
import time
from queue import Queue
from threading import Thread
from multiprocessing import Process


class Joiner(Thread):

    def __init__(self):
        super().__init__()
        self.workers = Queue()

    def run(self):

        while True:
            worker = self.workers.get()

            if worker is None:
                break

            worker.join()


def do_work(t):
    pid = os.getpid()
    print('Process', pid, 'STARTED')
    time.sleep(t)
    print('Process', pid, 'FINISHED')


if __name__ == '__main__':
    joiner = Joiner()
    joiner.start()

    for t in range(1, 6, 2):
        p = Process(target=do_work, args=(t,))
        p.start()
        joiner.workers.put(p)

    joiner.workers.put(None)
    joiner.join()

Вывод:

Process 14498 STARTED
Process 14500 STARTED
Process 14499 STARTED
Process 14498 FINISHED
Process 14499 FINISHED
Process 14500 FINISHED
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...