Как ждать будущего, которое еще не было отправлено в пул потоков? - PullRequest
1 голос
/ 20 ноября 2019

Я хотел бы создать «TaskPoolManager», который запускает «Задачу» (пользовательский объект) в ThreadPoolExecutor и назначает им приоритет по уровню важности, времени с момента отправки и т. Д. (Это свойства задачи)

Моя проблема в том, что когда ThreadPoolExecutor заполнен, другая задача, отправленная в пул, будет выполнена в режиме "FIFO" и не будет иметь приоритетов.

Здесь класс TaskPoolManager:


class TaskPoolManager:
    def __init__(self, max_workers: int = None):
        self.max_workers = max_workers or (os.cpu_count() or 1) * 5
        self._pool_executor = ThreadPoolExecutor(max_workers=self.max_workers, 
                                                 thread_name_prefix="TaskPoolManager")
        self.pending_task: Dict[Task, Future] = {Task(func=None): Future()}
        self.running_workers = 0

    # Task are callable
    def submit(self, task: Task) -> Future:
        if self.running_workers == self.max_workers:
            return self._add_task_to_queue(task)
        else:
            return self._start_task(task)

    def _start_task(self, task: Task) -> Future:
        """Submit a task in the pool"""
        self.running_workers = self.running_workers + 1
        future = self._pool_executor.submit(task)
        future.add_done_callback(lambda x: self._completed_thread())
        return future

    def _add_task_to_queue(self, task: Task) -> Future:
        """Add task to the not started task queue"""
        not_started_future = Future()
        self.pending_task[task] = not_started_future
        return not_started_future

    def _completed_thread(self):
        """Call when a thread in the pool as terminated a task"""
        self.running_workers = self.running_workers - 1
        self._start_task_in_queue()  # By priority level

Вот пример того, как его использовать:

manager = TaskPoolManager()

for i in range(0, 10000):
    manager.submit(Task(func=wait_random_time_task))

f = manager.submit(Task(func=wait_random_time_task))

# This isn't submitted to the thread pool yet, but need to be waitable like it is.
f.result()

Есть ли способ подключить экземпляр клиента Future в Future экземпляр, созданный ThreadPoolExecutor.submit позже в ходе выполнения?

Если нет, есть ли способ вернуть Future подобный объект, который может быть связан с будущим позже и все еще ждатьдля .result()?

Другими словами: как ждать будущее, которое еще не было отправлено в пул потоков?

...