Как вы моделируете что-то со временем в Python? - PullRequest
7 голосов
/ 05 марта 2020

Я ищу тип данных, который помог бы мне моделировать доступность ресурсов в течение продолжительного времени.

  • Мы открыты с 9 до 6 и можем выполнять 5 параллельных заданий. В моей воображаемой среде программирования я только что инициализировал объект с таким диапазоном со значением 3 по всей доске.
  • У нас есть встречи в книгах, у каждого есть время начала и окончания.
  • Мне нужно пробивать каждый из них вне дня
  • Это оставляет мне график сорта, где доступность увеличивается и уменьшается, но в конечном итоге позволяет мне быстро находить временные диапазоны, где есть оставшаяся доступность.

Я пришел к этой проблеме со многих сторон, но всегда возвращаюсь к фундаментальной проблеме, заключающейся в незнании типа данных для моделирования чего-либо столь же простого, как целое число во времени.

I может преобразовать мои встречи в события временного ряда (например, встреча приходит означает -1 доступность, встреча оставляет означает +1), но я все еще не знаю, как манипулировать этими данными, чтобы можно было выделить периоды, когда доступность больше нуля.


Кто-то оставил закрытое голосование, сославшись на отсутствие фокуса, но моя цель здесь кажется пр etty единственное число, поэтому я постараюсь объяснить проблему графически. Я пытаюсь определить периоды времени, когда количество активных заданий падает ниже заданной емкости.

enter image description here

Превращение диапазона известной параллельной емкости (например, 3 между 9- 6) и список заданий с переменным началом / концом в список временных диапазонов доступного времени.

Ответы [ 6 ]

8 голосов
/ 05 марта 2020

Мой подход заключается в построении временного ряда, но с включением объекта доступности со значением, установленным для доступности в этом периоде.

availability: 
[
  {
    "start": 09:00,
    "end": 12:00,
    "value": 4
  },
  {
     "start": 12:00,
     "end": 13:00,
     "value": 3
  }
]
data: [
  {
    "start": 10:00,
    "end": 10:30,
  }
]

Построение индексации временного ряда в начале / конце раз, со значением в качестве значения. Время начала доступности + значение, время окончания - значение. Для события это будет -1 или +1, как вы сказали.

"09:00" 4
"10:00" -1
"10:30" 1
"12:00" -4
"12:00" 3
"13:00" -3

Затем группируйте по индексу, сумме и совокупной сумме.

получая:

"09:00" 4
"10:00" 3
"10:30" 4
"12:00" 3
"13:00" 0

Пример кода в pandas:

import numpy as np
import pandas as pd


data = [
  {
    "start": "10:00",
    "end": "10:30",
  }
]

breakpoints = [
  {
    "start": "00:00",
    "end": "09:00",
    "value": 0
  },
  {
    "start": "09:00",
    "end": "12:00",
    "value": 4
  },
  {
    "start": "12:00",
    "end": "12:30",
    "value": 4
  },
  {
    "start": "12:30",
    "end": "13:00",
    "value": 3
  },
  {
    "start": "13:00",
    "end": "00:00",
    "value": 0
  }
]

df = pd.DataFrame(data, columns=['start', 'end'])

print(df.head(5))

starts = pd.DataFrame(data, columns=['start'])
starts["value"] = -1
starts = starts.set_index("start")

ends = pd.DataFrame(data, columns=['end'])
ends["value"] = 1
ends = ends.set_index("end")

breakpointsStarts = pd.DataFrame(breakpoints, columns=['start', 'value']).set_index("start")

breakpointsEnds = pd.DataFrame(breakpoints, columns=['end', 'value'])
breakpointsEnds["value"] = breakpointsEnds["value"].transform(lambda x: -x)
breakpointsEnds = breakpointsEnds.set_index("end")

countsDf = pd.concat([starts, ends, breakpointsEnds, breakpointsStarts]).sort_index()
countsDf = countsDf.groupby(countsDf.index).sum().cumsum()

print(countsDf)

# Periods that are available

df = countsDf
df["available"] = df["value"] > 0

# Indexes where the value of available changes
# Alternatively swap out available for the value.
time_changes = df["available"].diff()[df["available"].diff() != 0].index.values
newDf = pd.DataFrame(time_changes, columns= ["start"])

# Setting the end column to the value of the next start
newDf['end'] = newDf.transform(np.roll, shift=-1)
print(newDf)

# Join this back in to get the actual value of available
mergedDf = newDf.merge(df, left_on="start", right_index=True)

print(mergedDf)

возвращение в конце:

   start    end  value  available
0  00:00  09:00      0      False
1  09:00  13:00      4       True
2  13:00  00:00      0      False
2 голосов
/ 10 марта 2020

Для меня эта проблема была бы хорошо представлена ​​списком логических значений. Для простоты объяснения предположим, что продолжительность каждой потенциальной работы кратна 15 минутам. Итак, с 9 до 6 у нас есть 135 «временных интервалов», для которых мы хотим отслеживать доступность. Мы представляем доступность очереди во временном интервале с логическими переменными: False, если очередь обрабатывает задание, True, если очередь доступна.

Сначала мы создаем список временных интервалов для каждой очереди, а также вывод. Таким образом, каждая очередь и выход имеют временные интервалы t k , 1 <= k <= 135. </p>

Тогда, учитывая пять очередей заданий, q j , 1 <= j <= 5, мы говорим, что t <sub>k"открыто" в момент времени k, если существует хотя бы один q j , где список временных интервалов в индексе k равен True.

Мы можем реализовать это в автономном Python следующим образом:

slots = [ True ] * 135
queues = [ slots ] * 5
output = [ False ] * 135

def available (k):

 for q in queues:
  if q[k]:
   return True

 return False

Затем можно предположить, что существует некоторая функция dispatch (length), которая назначает задание доступной очереди, устанавливая соответствующие слоты от queue[q] до False.

Наконец, чтобы обновить вывод, мы просто вызываем:

def update():

 for k in range(0, 135):
  output[k] = available[k]

Или, для повышения эффективности:

def update(i, j):
 for k in range(i, j):
  output[k] = available[k]

Затем вы можете просто позвонить update(i, j) всякий раз, когда dispatch() обновляет временные интервалы i через j для новой работы. Таким образом, диспетчеризация и обновление - это операция O (n), где n - это количество временных интервалов, которые меняются, независимо от того, сколько существует временных интервалов.

Это позволит вам создать простую функцию, которая отображает удобочитаемое время на диапазон значений временных интервалов, что позволит увеличивать или уменьшать временные интервалы при увеличении sh.

Вы также можете легко расширить эту идею, чтобы использовать фрейм данных pandas, где каждый столбец представляет собой одну очередь, что позволяет вам использовать Series.any() в каждой строке сразу для быстрого обновления выходного столбца.

Хотелось бы услышать предложения относительно этого подхода! Возможно, есть сложность проблемы, которую я пропустил, но я думаю, что это хорошее решение.

2 голосов
/ 05 марта 2020

Я бы подошел к этому так же, как вы сделали с назначениями. Моделируйте свободное время как встречи самостоятельно. Для каждой конечной встречи проверьте, есть ли еще один на продолжающемся, если так, пропустите здесь. Если нет, найдите следующую начальную встречу (та, чья дата начала больше указанной даты).

После того, как вы итерировали все свои встречи, у вас должна быть инвертированная маска.

1 голос
/ 13 марта 2020

Вы можете использовать (datetime, increment) кортежи, чтобы отслеживать изменения в доступности. Событие job-start имеет increment = 1, а событие end-end имеет increment = -1. Тогда itertools.accumulate позволяет рассчитать совокупную доступность, когда задания начинаются и заканчиваются со временем. Вот пример реализации:

from datetime import time
import itertools as it

def compute_availability(jobs, opening_hours, capacity):
    jobs = [((x, -1), (y, +1)) for x, y in jobs]
    opens, closes = opening_hours
    events = [[opens, capacity]] + sorted(t for job in jobs for t in job) + [(closes, 0)]
    availability = list(it.accumulate(events,
                                      lambda x, y: [y[0], x[1] + y[1]]))
    for x, y in zip(availability, availability[1:]):
        # If multiple events happen at the same time, only yield the last one.
        if y[0] > x[0]:
            yield x

Это добавляет искусственные события (opens, capacity) и (closes, 0) для инициализации вычисления. В приведенном выше примере рассматривается один день, но его легко расширить до нескольких дней, создав opens и closes datetime объектов, которые совместно используют день первого и последнего задания соответственно.

Пример

Вот вывод для примера расписания OP:

from pprint import pprint

jobs = [(time(10), time(15)),
        (time(9), time(11)),
        (time(12, 30), time(16)),
        (time(10), time(18))]

availability = list(compute_availability(
    jobs, opening_hours=(time(9), time(18)), capacity=3
))
pprint(availability)

, который печатает:

[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(15, 0), 1],
 [datetime.time(16, 0), 2]]

Первый элемент указывает, когда изменяется доступность и второй элемент обозначает доступность, которая является результатом этого изменения. Например, в 9 утра отправляется одно задание, в результате чего доступность падает с 3 до 2, а затем в 10 утра отправляются еще два задания, пока первое еще работает (следовательно, доступность снижается до 0).

Добавление новых заданий

Теперь, когда мы рассчитали начальную доступность, важным аспектом является ее обновление по мере добавления новых заданий. Здесь желательно не пересчитывать доступность из полного списка заданий, поскольку это может быть дорогостоящим, если отслеживается много заданий. Поскольку availability уже отсортирован, мы можем использовать модуль bisect, чтобы определить соответствующий диапазон обновления в O (log (N)). Затем необходимо выполнить ряд шагов. Допустим, задание запланировано как [x, y], где x, y являются двумя объектами даты и времени.

  1. Проверьте, является ли доступность в интервале [x, y] больше нуля (включая событие слева от x (то есть от предыдущего события)).
  2. Уменьшите доступность всех событий в [x, y] на 1.
  3. Если x отсутствует в списке события, которые мы должны добавить, в противном случае нам нужно проверить, можем ли мы объединить событие x с тем, которое ему осталось.
  4. Если y нет в списке событий, нам нужно добавить его .

Вот соответствующий код:

import bisect

def add_job(availability, job, *, weight=1):
    """weight: how many lanes the job requires"""
    job = list(job)
    start = bisect.bisect(availability, job[:1])
    # Emulate a `bisect_right` which doens't work directly since
    # we're comparing lists of different length.
    if start < len(availability):
        start += (job[0] == availability[start][0])
    stop = bisect.bisect(availability, job[1:])

    if any(slot[1] < weight for slot in availability[start-1:stop]):
        raise ValueError('The requested time slot is not available')

    for slot in availability[start:stop]:
        slot[1] -= weight

    if job[0] > availability[start-1][0]:
        previous_availability = availability[start-1][1]
        availability.insert(start, [job[0], previous_availability - weight])
        stop += 1
    else:
        availability[start-1][1] -= weight
        if start >= 2 and availability[start-1][1] == availability[start-2][1]:
            del availability[start-1]
            stop -= 1

    if stop == len(availability) or job[1] < availability[stop][0]:
        previous_availability = availability[stop-1][1]
        availability.insert(stop, [job[1], previous_availability + weight])

Пример расписания

Мы можем проверить это, добавив некоторые задания в пример расписания ОП:

for job in [[time(15), time(17)],
            [time(11, 30), time(12)],
            [time(13), time(14)]]:  # this one should raise since availability is zero
    print(f'\nAdding {job = }')
    add_job(availability, job)
    pprint(availability)

который выводит:

Adding job = [datetime.time(15, 0), datetime.time(17, 0)]
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(16, 0), 1],
 [datetime.time(17, 0), 2]]

Adding job = [datetime.time(11, 30), datetime.time(12, 0)]
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(11, 30), 0],
 [datetime.time(12, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(16, 0), 1],
 [datetime.time(17, 0), 2]]

Adding job = [datetime.time(13, 0), datetime.time(14, 0)]
Traceback (most recent call last):
  [...]
ValueError: The requested time slot is not available

Блокировка ночных часов

Мы также можем использовать этот интерфейс для блокировки всех полос в часы, когда услуга недоступна (например, с 18:00 до 9:00). на следующий день). Просто отправьте задание с weight=capacity на этот промежуток времени:

add_job(availability,
        [datetime(2020, 3, 14, 18), datetime(2020, 3, 15, 9)]
        weight=3)

Создание полного расписания с нуля

Мы также можем использовать add_job для создания полного расписания с нуля:

availability = availability = list(compute_availability(
    [], opening_hours=(time(9), time(18)), capacity=3
))
print('Initial availability')
pprint(availability)
for job in jobs:
    print(f'\nAdding {job = }')
    add_job(availability, job)
    pprint(availability)

который выводит:

Initial availability
[[datetime.time(9, 0), 3]]

Adding job = (datetime.time(10, 0), datetime.time(15, 0))
[[datetime.time(9, 0), 3],
 [datetime.time(10, 0), 2],
 [datetime.time(15, 0), 3]]

Adding job = (datetime.time(9, 0), datetime.time(11, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 1],
 [datetime.time(11, 0), 2],
 [datetime.time(15, 0), 3]]

Adding job = (datetime.time(12, 30), datetime.time(16, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 1],
 [datetime.time(11, 0), 2],
 [datetime.time(12, 30), 1],
 [datetime.time(15, 0), 2],
 [datetime.time(16, 0), 3]]

Adding job = (datetime.time(10, 0), datetime.time(18, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(15, 0), 1],
 [datetime.time(16, 0), 2],
 [datetime.time(18, 0), 3]]
0 голосов
/ 14 марта 2020

Вы можете использовать выделенный класс, представляющий полосу, которая может выполнять задания. Эти объекты могут отслеживать задания и, соответственно, их доступность:

import bisect
from datetime import time
from functools import total_ordering
import math


@total_ordering
class TimeSlot:
    def __init__(self, start, stop, lane):
        self.start = start
        self.stop = stop
        self.lane = lane

    def __contains__(self,  other):
        return self.start <= other.start and self.stop >= other.stop

    def __lt__(self, other):
        return (self.start, -self.stop.second) < (other.start, -other.stop.second)

    def __eq__(self, other):
        return (self.start, -self.stop.second) == (other.start, -other.stop.second)

    def __str__(self):
        return f'({self.lane}) {[self.start, self.stop]}'

    __repr__ = __str__


class Lane:
    @total_ordering
    class TimeHorizon:
        def __repr__(self):
            return '...'
        def __lt__(self, other):
            return False
        def __eq__(self, other):
            return False
        @property
        def second(self):
            return math.inf
        @property
        def timestamp(self):
            return math.inf

    time_horizon = TimeHorizon()
    del TimeHorizon

    def __init__(self, start, nr):
        self.nr = nr
        self.availability = [TimeSlot(start, self.time_horizon, self)]

    def add_job(self, job):
        if not isinstance(job, TimeSlot):
            job = TimeSlot(*job, self)
        # We want to bisect_right but only on the start time,
        # so we need to do it manually if they are equal.
        index = bisect.bisect_left(self.availability, job)
        if index < len(self.availability):
            index += (job.start == self.availability[index].start)
        index -= 1  # select the corresponding free slot
        slot = self.availability[index]
        if slot.start > job.start or slot.stop is not self.time_horizon and job.stop > slot.stop:
            raise ValueError('Requested time slot not available')
        if job == slot:
            del self.availability[index]
        elif job.start == slot.start:
            slot.start = job.stop
        elif job.stop == slot.stop:
            slot.stop = job.start
        else:
            slot_end = slot.stop
            slot.stop = job.start
            self.availability.insert(index+1, TimeSlot(job.stop, slot_end, self))

A Lane объект может использоваться следующим образом:

lane = Lane(start=time(9), nr=1)
print(lane.availability)
lane.add_job([time(11), time(14)])
print(lane.availability)

, который выводит:

[(1) [datetime.time(9, 0), ...]]
[(1) [datetime.time(9, 0), datetime.time(11, 0)],
 (1) [datetime.time(14, 0), ...]]

После добавления задания обновляется и доступность.

Теперь мы можем использовать несколько этих объектов дорожек вместе для представления полного расписания. Задания могут быть добавлены по мере необходимости, и доступность будет обновляться автоматически:

class Schedule:
    def __init__(self, n_lanes, start):
        self.lanes = [Lane(start, nr=i) for i in range(n_lanes)]

    def add_job(self, job):
        for lane in self.lanes:
            try:
                lane.add_job(job)
            except ValueError:
                pass
            else:
                break

Тестирование по примеру расписания

from pprint import pprint

# Example jobs from OP.
jobs = [(time(10), time(15)),
        (time(9), time(11)),
        (time(12, 30), time(16)),
        (time(10), time(18))]

schedule = Schedule(3, start=time(9))
for job in jobs:
    schedule.add_job(job)

for lane in schedule.lanes:
    pprint(lane.availability)

, которое выводит:

[(0) [datetime.time(9, 0), datetime.time(10, 0)],
 (0) [datetime.time(15, 0), ...]]
[(1) [datetime.time(11, 0), datetime.time(12, 30)],
 (1) [datetime.time(16, 0), ...]]
[(2) [datetime.time(9, 0), datetime.time(10, 0)],
 (2) [datetime.time(18, 0), ...]]

Автоматизация c балансировка нагрузки для заданий

Мы можем создать выделенную древовидную структуру, которая отслеживает временные интервалы всех полос для выбора наиболее подходящего слота при регистрации нового задания. Узел в дереве представляет один временной интервал, а его дочерние элементы - это все временные интервалы, которые содержатся в этом интервале. Затем при регистрации новой работы мы можем искать дерево, чтобы найти оптимальный слот. Дерево и дорожки имеют одни и те же временные интервалы, поэтому нам нужно настраивать слоты только вручную, когда они удалены или вставлены новые. Вот соответствующий код, он немного длинен (просто быстрый набросок):

import itertools as it

class OneStepBuffered:
    """Can back up elements that are consumed by `it.takewhile`.
       From: https://stackoverflow.com/a/30615837/3767239
    """
    _sentinel = object()

    def __init__(self, it):
        self._it = iter(it)
        self._last = self._sentinel
        self._next = self._sentinel

    def __iter__(self):
        return self

    def __next__(self):
        sentinel = self._sentinel
        if self._next is not sentinel:
            next_val, self._next = self._next, sentinel
            return next_val
        try:
            self._last = next(self._it)
            return self._last
        except StopIteration:
            self._last = self._next = sentinel
            raise

    def step_back(self):
        if self._last is self._sentinel:
            raise ValueError("Can't back up a step")
        self._next, self._last = self._last, self._sentinel


class SlotTree:
    def __init__(self, slot, subslots, parent=None):
        self.parent = parent
        self.slot = slot
        self.subslots = []
        slots = OneStepBuffered(subslots)
        for slot in slots:
            subslots = it.takewhile(lambda x: x.stop <= slot.stop, slots)
            self.subslots.append(SlotTree(slot, subslots, self))
            try:
                slots.step_back()
            except ValueError:
                break

    def __str__(self):
        sub_repr = ['\n|   '.join(str(slot).split('\n'))
                    for slot in self.subslots]
        sub_repr = [f'|   {x}' for x in sub_repr]
        sub_repr = '\n'.join(sub_repr)
        sep = '\n' if sub_repr else ''
        return f'{self.slot}{sep}{sub_repr}'

    def find_minimal_containing_slot(self, slot):
        try:
            return min(self.find_containing_slots(slot),
                       key=lambda x: x.slot.stop.second - x.slot.start.second)
        except ValueError:
            raise ValueError('Requested time slot not available') from None

    def find_containing_slots(self, slot):
        for candidate in self.subslots:
            if slot in candidate.slot:
                yield from candidate.find_containing_slots(slot)
                yield candidate

    @classmethod
    def from_slots(cls, slots):
        # Ascending in start time, descending in stop time (secondary).
        return cls(cls.__name__, sorted(slots))


class Schedule:
    def __init__(self, n_lanes, start):
        self.lanes = [Lane(start, i+1) for i in range(n_lanes)]
        self.slots = SlotTree.from_slots(
            s for lane in self.lanes for s in lane.availability)

    def add_job(self, job):
        if not isinstance(job, TimeSlot):
            job = TimeSlot(*job, lane=None)
        # Minimal containing slot is one possible strategy,
        # others can be implemented as well.
        slot = self.slots.find_minimal_containing_slot(job)
        lane = slot.slot.lane
        if job == slot.slot:
            slot.parent.subslots.remove(slot)
        elif job.start > slot.slot.start and job.stop < slot.slot.stop:
            slot.parent.subslots.insert(
                slot.parent.subslots.index(slot) + 1,
                SlotTree(TimeSlot(job.stop, slot.slot.stop, lane), [], slot.parent))
        lane.add_job(job)

Теперь мы можем использовать класс Schedule, чтобы автоматически назначать задания для дорожек и обновлять их доступность:

if __name__ == '__main__':
    jobs = [(time(10), time(15)),  # example from OP
            (time(9), time(11)),
            (time(12, 30), time(16)),
            (time(10), time(18))]

    schedule = Schedule(3, start=time(9))
    print(schedule.slots, end='\n\n')
    for job in jobs:
        print(f'Adding {TimeSlot(*job, "new slot")}')
        schedule.add_job(job)
        print(schedule.slots, end='\n\n')

, который выводит:

SlotTree
|   (1) [datetime.time(9, 0), ...]
|   (2) [datetime.time(9, 0), ...]
|   (3) [datetime.time(9, 0), ...]

Adding (new slot) [datetime.time(10, 0), datetime.time(15, 0)]
SlotTree
|   (1) [datetime.time(9, 0), datetime.time(10, 0)]
|   (1) [datetime.time(15, 0), ...]
|   (2) [datetime.time(9, 0), ...]
|   (3) [datetime.time(9, 0), ...]

Adding (new slot) [datetime.time(9, 0), datetime.time(11, 0)]
SlotTree
|   (1) [datetime.time(9, 0), datetime.time(10, 0)]
|   (1) [datetime.time(15, 0), ...]
|   (2) [datetime.time(11, 0), ...]
|   (3) [datetime.time(9, 0), ...]

Adding (new slot) [datetime.time(12, 30), datetime.time(16, 0)]
SlotTree
|   (1) [datetime.time(9, 0), datetime.time(10, 0)]
|   (1) [datetime.time(15, 0), ...]
|   (2) [datetime.time(11, 0), datetime.time(12, 30)]
|   (2) [datetime.time(16, 0), ...]
|   (3) [datetime.time(9, 0), ...]

Adding (new slot) [datetime.time(10, 0), datetime.time(18, 0)]
SlotTree
|   (1) [datetime.time(9, 0), datetime.time(10, 0)]
|   (1) [datetime.time(15, 0), ...]
|   (2) [datetime.time(11, 0), datetime.time(12, 30)]
|   (2) [datetime.time(16, 0), ...]
|   (3) [datetime.time(9, 0), datetime.time(10, 0)]
|   (3) [datetime.time(18, 0), ...]

Числа (i) обозначают номер полосы, а [] указывают доступные временные интервалы на этой полосе. A ... обозначает «открытый конец» (временной горизонт). Как мы видим, дерево не перестраивается, когда корректируются временные интервалы; это было бы возможным улучшением. В идеале для каждого нового задания соответствующий временной интервал наилучшего соответствия извлекается из дерева, а затем, в зависимости от того, как задание помещается в слоте, скорректированная версия и, возможно, новые слоты перемещаются обратно в дерево (или вообще ничего, если задание точно подходит под слот).

В приведенных выше примерах рассматриваются только один день и time объекты, но код легко расширяется для использования с datetime объектами.

0 голосов
/ 13 марта 2020

Если ваше разрешение по времени не превышает минуты, я бы предложил использовать карту минут в день с набором идентификаторов заданий, назначенных для промежутка времени каждого задания

Например:

# convert time to minute of the day (assumes24H time, but you can make this your own way)
def toMinute(time): 
    return sum(p*t for p,t in zip(map(int,time.split(":")),(60,1)))

def toTime(minute):
    return f"{minute//60}:{minute%60:02d}"

# booking a job adds it to all minutes covered by its duration
def book(timeMap,jobId,start,duration):
    startMin = toMinute(start)
    for m in range(startMin,startMin+duration):
        timeMap[m].add(jobId)

# unbooking a job removes it from all minutes where it was present
def unbook(timeMap,jobId):
    for s in timeMap:
        s.discard(jobId)

# return time ranges for minutes meeting a given condition
def minuteSpans(timeMap,condition,start="09:00",end="18:00"):
    start,end  = toMinute(start),toMinute(end)
    timeRange  = timeMap[start:end]
    match      = [condition(s) for s in timeRange]
    breaks     = [True] + [a!=b for a,b in zip(match,match[1:])]
    starts     = [i for (i,a),b in zip(enumerate(match),breaks) if b]
    return [(start+s,start+e) for s,e in zip(starts,starts[1:]+[len(match)]) if match[s]]

def timeSpans(timeMap,condition,start="09:00",end="18:00"):
    return [(toTime(s),toTime(e)) for s,e in minuteSpans(timeMap,condition,start,end)]

# availability is ranges of minutes where the number of jobs is less than your capacity
def available(timeMap,start="09:00",end="18:00",maxJobs=5):
    return timeSpans(timeMap,lambda s:len(s)<maxJobs,start,end)

пример использования:

timeMap = [set() for _ in range(1440)]

book(timeMap,"job1","9:45",25)
book(timeMap,"job2","9:30",45)
book(timeMap,"job3","9:00",90)

print(available(timeMap,maxJobs=3))
[('9:00', '9:45'), ('10:10', '18:00')]

print(timeSpans(timeMap,lambda s:"job3" in s))
[('9:00', '10:30')]

С некоторыми изменениями вы можете даже выполнять прерывистые задания, которые пропускаются в течение определенных периодов (например, во время обеда). Вы также можете заблокировать некоторые периоды, помещая в них фальшивые задания.

Если вам нужно индивидуально управлять очередями заданий, вы можете иметь отдельные временные карты (по одной на очередь) и объединять их в одно, когда вам нужно иметь глобальная картина:

 print(available(timeMap1,maxJobs=1))
 print(available(timeMap2,maxJobs=1))
 print(available(timeMap3,maxJobs=1))

 globalMap = list(set.union(*qs) for qs in zip(timeMap1,timeMap2,timeMap3))
 print(available(globalMap),maxJobs=3)

Поместите все это в класс TimeMap (вместо отдельных функций), и у вас должен быть довольно хороший набор инструментов для работы.

...