Планирование FIFO с concurrent.futures - PullRequest
0 голосов
/ 18 июня 2019

Предположим, у меня есть несколько хостов, доступных по SSH, и (гораздо большее) количество команд, которые я могу отправить любому из этих хостов.Следующие предположения верны:

  • Команды должны планироваться на основе FIFO.
  • Время выполнения каждой команды unknown и host-independent .
  • Один хост может выполнять только одну команду за один раз.
  • Как только на каком-либо хосте происходит сбой любой команды, я хочу прервать все выполнения и вызвать исключение.

Имея это в виду, я реализовал некоторый код на основе concurrent.futures для выполнения этого планирования.Автономная версия представлена ​​ниже и предполагает, что вы можете использовать SSH для localhost.

import concurrent.futures
import shlex
import subprocess
import random

import logging

from collections import deque


logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.DEBUG)

random.seed(13)

MAX_DURATION = 10
NUM_COMMANDS = 100
NUM_HOSTS = 5
HOSTS = ['localhost'] * NUM_HOSTS


def simulate_remote_call(host: str) -> None:
    duration = random.randrange(1, MAX_DURATION + 1)
    cmd = ['sleep', str(duration)]
    quoted_cmd = [shlex.quote(token) for token in cmd]
    logging.debug("Sleeping for %d seconds on %s", duration, host)
    subprocess.check_call(['ssh', host, *quoted_cmd])
    logging.debug("Done sleeping on %s", host)


with concurrent.futures.ThreadPoolExecutor(max_workers=len(HOSTS)) as executor:
    it = iter(range(1, NUM_COMMANDS + 1))
    queue = deque(HOSTS, len(HOSTS))
    future_to_host = {}

    while True:
        return_when = concurrent.futures.FIRST_EXCEPTION

        while queue:
            try:
                command = next(it)
            except StopIteration:
                break
            else:
                host = queue.popleft()
                logging.debug("Submitting command %d", command)
                future = executor.submit(simulate_remote_call, host)
                future_to_host[future] = host
        else:
            return_when = concurrent.futures.FIRST_COMPLETED

        if not future_to_host:
            break

        done, not_done = concurrent.futures.wait(future_to_host,
                                                 return_when=return_when)

        for future in done:
            host = future_to_host[future]
            exception = future.exception()

            if exception is not None:
                for other in not_done:
                    other.cancel()

                raise exception

            del future_to_host[future]
            queue.append(host)

Однако , этот код кажется очень неестественным, и я ищу более простой способчтобы сделать это.Есть идеи?

...