Предположим, у меня есть несколько хостов, доступных по SSH, и (гораздо большее) количество команд, которые я могу отправить любому из этих хостов.Следующие предположения верны:
- Команды должны планироваться на основе FIFO.
- Время выполнения каждой команды unknown и host-independent .
- Один хост может выполнять только одну команду за один раз.
- Как только на каком-либо хосте происходит сбой любой команды, я хочу прервать все выполнения и вызвать исключение.
Имея это в виду, я реализовал некоторый код на основе concurrent.futures
для выполнения этого планирования.Автономная версия представлена ниже и предполагает, что вы можете использовать SSH для localhost
.
import concurrent.futures
import shlex
import subprocess
import random
import logging
from collections import deque
logging.basicConfig(format=logging.BASIC_FORMAT, level=logging.DEBUG)
random.seed(13)
MAX_DURATION = 10
NUM_COMMANDS = 100
NUM_HOSTS = 5
HOSTS = ['localhost'] * NUM_HOSTS
def simulate_remote_call(host: str) -> None:
duration = random.randrange(1, MAX_DURATION + 1)
cmd = ['sleep', str(duration)]
quoted_cmd = [shlex.quote(token) for token in cmd]
logging.debug("Sleeping for %d seconds on %s", duration, host)
subprocess.check_call(['ssh', host, *quoted_cmd])
logging.debug("Done sleeping on %s", host)
with concurrent.futures.ThreadPoolExecutor(max_workers=len(HOSTS)) as executor:
it = iter(range(1, NUM_COMMANDS + 1))
queue = deque(HOSTS, len(HOSTS))
future_to_host = {}
while True:
return_when = concurrent.futures.FIRST_EXCEPTION
while queue:
try:
command = next(it)
except StopIteration:
break
else:
host = queue.popleft()
logging.debug("Submitting command %d", command)
future = executor.submit(simulate_remote_call, host)
future_to_host[future] = host
else:
return_when = concurrent.futures.FIRST_COMPLETED
if not future_to_host:
break
done, not_done = concurrent.futures.wait(future_to_host,
return_when=return_when)
for future in done:
host = future_to_host[future]
exception = future.exception()
if exception is not None:
for other in not_done:
other.cancel()
raise exception
del future_to_host[future]
queue.append(host)
Однако , этот код кажется очень неестественным, и я ищу более простой способчтобы сделать это.Есть идеи?