Как делать повторные запросы на задания - PullRequest
2 голосов
/ 09 ноября 2019

Я хочу сделать повторные запросы к серверу, который вернется с некоторыми задачами. Ответом от сервера будет словарь со списком функций, которые необходимо вызвать. Например:

{ 
   tasks: [
      {
         function: "HelloWorld",
         id: 1212
      },
      {
         function: "GoodbyeWorld"
         id: 1222
      }
   ]
}

ПРИМЕЧАНИЕ : я не понимаю.

Для каждой из этих задач я буду запускать указанную функцию, используя multiprocessing. Вот пример моего кода:

r = requests.get('https://localhost:5000', auth=('user', 'pass'))
data = r.json()

if len(data["tasks"]) > 0:
  manager = multiprocessing.Manager()
  for task in data["tasks"]:
    if task["function"] == "HelloWorld":
      helloObj = HelloWorldClass()
      hello = multiprocessing.Process(target=helloObj.helloWorld)
      hello.start()
      hello.join()
    elif task["function"] == "GoodbyeWorld":
      byeObj = GoodbyeWorldClass()
      bye = multiprocessing.Process(target=byeObj.byeWorld)
      bye.start()
      bye.join()

Проблема в том, что я хочу делать повторные запросы и заполнять массив data["tasks"], когда другие процессы выполняются. Если я добавлю все в какой-то цикл while, он сделает запрос только после того, как будут выполнены все процессы из первоначального ответа (когда достигнут join() для всех процессов).

Может кто-нибудь помочьмне делать повторные запросы и заполнять массив непрерывно? Пожалуйста, дайте мне знать, если мне нужно сделать какие-либо разъяснения.

Ответы [ 2 ]

2 голосов
/ 13 ноября 2019

Если я вас правильно понял, вам нужно что-то вроде этого:

import time
from multiprocessing import Process

import requests

from task import FunctionFactory


def get_tasks():
    resp = requests.get('https://localhost:5000', auth=('user', 'pass'))
    data = resp.json()

    return data['tasks']


if __name__ == '__main__':
    procs = {}

    for _ in range(10):
        tasks = get_tasks()

        if not tasks:
            time.sleep(5)
            continue

        for task in tasks:

            if task['id'] in procs:
                # This task has been already submitted for execution.
                continue

            func = FunctionFactory.build(task['function'])

            proc = Process(target=func)
            proc.start()

            procs[task['id']] = proc

    # Waiting for all the submitted tasks to finish.
    for proc in procs.values():
        proc.join()

Здесь функция get_tasks используется для запроса списка словарей с клавишами id и function отсервер. В главном разделе есть словарь procs, который отображает id на запущенные экземпляры процессов, которые выполняют функции, построенные на FunctionFactory, используя имена function полученных задач. В случае, если уже есть запущенная задача с тем же идентификатором, она игнорируется.

При таком подходе вы можете запрашивать задачи так часто, как это необходимо (здесь 10 запросы используются в forцикл) и запустить процессы для их параллельного выполнения. В конце концов, вы просто ждете завершения всех представленных задач.

1 голос
/ 15 ноября 2019

В вашей программе есть ошибка, вы должны вызвать join s после того, как вы создали все задачи . Присоединяйся к блокам, пока процесс не закончится - в твоем случае, прежде чем начать следующий. Что практически заставляет вас всю программу работать последовательно.

...