Как управлять очередью задач в Python и запускать эти задачи параллельно на нескольких компьютерах? - PullRequest
1 голос
/ 25 апреля 2019

Я ищу библиотеку python, которая позволяет: управлять очередью задач, выполнять задачи параллельно (на одном или нескольких компьютерах), разрешать, чтобы задача могла генерировать другие задачи в очереди и была совместима с UNIX и Windows.

Я прочитал некоторые документы о Celery, RQ, SCoOP, многопроцессорной обработке для части диспетчера задач и redis, rabbitMQ и ZMQ для части брокера сообщений, но я не знаю, какой вариант лучше.

1 Ответ

0 голосов
/ 25 апреля 2019

Рассмотрим многопроцессорную библиотеку Python .

Это позволяет использовать многие многопроцессорные функции, такие как запуск нескольких процессов в виде пула рабочих с использованием очереди работ. Он работает на одном сервере, но вы могли бы реализовать соединитель, который выполняет работу на другом сервере (например, через SSH и удаленно запускать исполняемый файл python).

В противном случае мне неизвестна библиотека Python, которая может работать на разных серверах и на разных платформах. Вам может понадобиться приложение в контейнере - что-то вроде Kubernetes.

Ниже приведен пример кода, который я написал, который добавляет «идентификаторы задач» в очередь, представляющую выполняемые задачи. Затем они могут выполняться параллельно пулом рабочих.

import time
from multiprocessing import Queue, Pool, Process
from Queue import Empty

# For writing to logs when using multiprocessing
import logging
from multiprocessing_logging import install_mp_handler()


class RuntimeHelper:
    """
    Wrapper to your "runtime" which can execute runs and is persistant within a worker thread.
    """
    def __init__(self):
        # Implement your own code here
        # Do some initialisation such as creating DB connections etc
        # Will be done once per worker when the worker starts
        pass

    def execute_run(self, run_id):
        # Implement your own code here to actually do the Run/Task.
        # In this case we just sleep for 30 secs instead of doing any real work
        time.sleep(30)
        pass


def worker(run_id_queue):
    """
    This function will be executed once by a Pool of Processes using multiprocessing.Pool
    :param queue: The thread-safe Queue of run_ids to use
    :return:
    """
    helper = RuntimeHelper()
    # Iterate runs until death
    logging.info("Starting")
    while True:
        try:
            run_id = run_id_queue.get_nowait()
            # A run_id=None is a signal to this process to die
            # An empty queue means: dont die, the queue is just empty for now and more work could be added soon
            if run_id is not None:
                logging.info("run_id={0}".format(run_id))
                helper.execute_run(run_id)
            else:
                logging.info("Kill signal received")
                return True
        except Empty:
            # Wait X seconds before checking for new work
            time.sleep(15)


if __name__ == '__main__':
    num_processes = 10
    check_interval_seconds = 15
    max_runtime_seconds = 60*15

    # ==========================================
    # INITIALISATION
    # ==========================================
    install_mp_handler() # Must be called before Pool is create

    queue = Queue()
    pool = Pool(num_processes, worker, (queue,))
    # don't forget the coma here  ^

    # ==========================================
    # LOOP
    # ==========================================

    logging.info('Starting to do work')

    # Naive wait-loop implementation
    max_iterations = max_runtime_seconds / check_interval_seconds
    for i in range(max_iterations):
        # Add work
        ready_runs = <Your code to get some runs>
        for ready_run in ready_runs:
            queue.put(ready_run.id)
        # Sleep while some of the runs are busy
        logging.info('Main thread sleeping {0} of {1}'.format(i, max_iterations))
        time.sleep(check_interval_seconds)

    # Empty the queue of work and send the kill signal (run_id = None)
    logging.info('Finishing up')
    while True:
        try:
            run_id = queue.get_nowait()
        except Empty:
            break
    for i in range(num_processes):
        queue.put(None)
    logging.info('Waiting for subprocesses')

    # Wait for the pool finish what it is busy with
    pool.close()
    pool.join()
    logging.info('Done')
...