Рассчитать текущий статус запущенного потока из ThreadPoolExecutor - PullRequest
0 голосов
/ 08 ноября 2019

Я использую ThreadPoolExecutor для создания пула максимум из 15 работников. также в то же время я опрашиваю задачи из источника.

позволяет предположить, что у меня есть 1000 задач для выполнения. а у меня 15 рабочих. Таким образом, первые 15 заданий получают 15 рабочих. Теперь, когда задача завершена, я хочу снова опросить 16-ю задачу и так далее.

Я использую следующий код.

executor=ThreadPoolExecutor(max_workers=15)
future=executor.submit(task_todo,task_details)

Как узнать, сколько потоков в настоящее время в рабочем состоянии из 15 рабочих? Короче говоря, я хочу знать, как я могу узнать, что 1 задание выполнено?

Я хочу непрерывно опрашивать задания, основываясь на количестве свободных работников.

1 Ответ

0 голосов
/ 08 ноября 2019

ThreadPoolExecutor - это упрощенная версия threading.Thread, если вы хотите узнать количество активных потоков и количество оставленных заданий, которые вы должны использовать threading.Thread, следуйте комментариям в коде ниже и не стесняйтесь задавать вопросы:

from threading import Thread, current_thread, Lock, active_count
from queue import Queue
import time


class TasksCounter:
    """This class counts number of left tasks"""
    def __init__(self, tasks):
        self.tasks = tasks


def worker(_q, _lock, _counter):
    """
    :param _q: Queue is used to take tasks from main thread
    :param _lock: is used to prevent several Queues to access TaskCounter simultaneously, otherwise problems
    :param _counter: # instance of TaskCounter class
    :return: None
    """
    while True:
        data = _q.get()  # get task from Queue
        if data == "STOP":
            break  # if poison pillow, stop thread

        time.sleep(2)  # simulate some work

        with _lock:  # block counter and decrease number of tasks
            _counter.tasks -= 1
            print("Tasks number {}, message from {}".format(_counter.tasks, current_thread().getName()))
            # the last thread number you'll get will be two, since there is also main thread:
            print("Information from: {}. Active threads number: {}".format(current_thread().getName(), active_count()))


if __name__ == "__main__":
    print("Active threads number: {}".format(active_count()))  # show number of active threads
    thread_number = 6
    q = Queue()
    lock = Lock()
    counter = TasksCounter(100)
    for i in range(100):  # create 100 tasks
        q.put(i)
    for j in range(thread_number):  # stop threads
        q.put("STOP")
    # create threads to work with the tasks
    workers = [Thread(target=worker, args=(q, lock, counter,), name="Thread-{}".format(i)) for i in range(thread_number)]
    for w in workers:
        w.start()

...