Несколько пользовательских потоков, извлекающих один и тот же элемент из очереди - PullRequest
0 голосов
/ 11 января 2019

Я создал поток производителя, который имеет несколько потребительских потоков. Я использовал очередь, где продюсер ставит все задачи. Затем потребительские потоки используют get для извлечения задач и их выполнения. Проблема, с которой я продолжаю сталкиваться, заключается в том, что некоторые элементы в очереди захватываются несколькими потоками потребителей.

В своем классе Automator я создаю пул потоков, затем помещаю все задачи в очередь, используя метод add_task в TaskThreadPool. Затем TaskRunners начинают вытягивать все задачи из очереди и запускать их. После завершения первого цикла каждый последующий цикл каждый потребительский поток будет захватывать последний элемент одновременно. Так что в этом случае я вижу несколько запущенных задач, когда нужно запустить только 1. Я пытался добавить блокировки в потребительский поток непосредственно перед выполнением _q.get, но я все еще получаю тот же результат. Как я могу гарантировать, что только один поток захватит задачи и не позволит другому потоку прикоснуться к нему.

Console Screenshot https://imgur.com/a/UN4K0Z7

class TaskThreadPool:
    """ Pool of threads consuming tasks for a queue """

    def __init__(self, num_threads):
        self.num_threads = num_threads
        self._q = Queue()
        self.workers = []

    def create_threads(self):
        for _ in range(self.num_threads):
            self.workers.append(TaskRunner(self._q))
        with print_lock:
            print('{} tasks threads created'.format(len(self.workers)))

    def add_task(self, task):
        """ Add a tasks to the queue """
        self._q.put(task)

    def wait_completion(self):
        """ Wait for completion of all the tasks in the queue """
        self._q.join()


class TaskRunner(Thread):
    """Thread executing tasks from a given tasks queue"""

    def __init__(self, queue):
        super(TaskRunner, self).__init__(daemon=True)
        self._q = queue
        self.start()

    def run(self):
        while True:
            if not self._q.empty():
                task = self._q.get()
                try:
                    task.run_task()
                finally:
                    self._q.task_done()


class Automator:
    def __init__(self, test_task_id=None, test_loop_count=0):
        """
        Automator Settings
        Basic start up settings, when testing a single task no Thread Pool is created.
        If Module Testing is enabled on test tasks will be run
        """
        # Specific Task testing information
        self.test_task_id = test_task_id
        self.task_test = False
        self.test_loop_count = test_loop_count
        self.print_lock = print_lock
        print('Starting Automator 3')

        ...


        # Queue information
        self.cycle_queue = []
        self.priority_queues = {}
        self.number_of_priority_queues = 0
        # Max number of threads to have running
        self.max_task_num_threads = 7
        self.threads_created = False
        self.task_pool = TaskThreadPool(self.max_task_num_threads)

        ...

        # If a test task id was provided turn task_test on
        if self.test_task_id:
            self.task_test = True

    def open_thread_pool(self):
        with self.print_lock:
            print('Creating Task Threads')
        self.task_pool.create_threads()
        self.threads_created = True

    ...

    ...

    def _run_cycle_queue(self):
        print('Running Cycle Tasks')
        for cycle_task in self.cycle_queue:
            self.task_pool.add_task(cycle_task)
        self.task_pool.wait_completion()

    def _run_standard_task_queues(self):
        """
        Loop through each task in a queue list and add task to queue
        """
        print('Running Standard Tasks')
        for queue_number in range(self.number_of_priority_queues):
            queue = self.priority_queues[str(queue_number)]

            if len(queue) > 0:
                for task in queue:
                    self.task_pool.add_task(task)
                self.task_pool.wait_completion()

    def _sleep(self):
        """
        Find when the next 5 minute interval. (10:00, 10:05, 10:10)
        Sleep till next 5 minute interval begins
        """
        now = dt.datetime.now()
        # How long until next run interval
        minutes_to_sleep = 5 - now.minute % 5
        print('Automator 3 Restarting in {} minutes'.format(minutes_to_sleep))
        time.sleep((minutes_to_sleep * 60) - now.second)
        now = dt.datetime.now()
        print('Automator 3 Restarting {}'.format(now))

    def run_automator(self):
        # Start Program Loop
        cycles = 0
        mode_print = False

        # Open Database Connection        
        self.dw.open_connection()
        try:
            while True:
                cycles += 1
                print('Cycle {} Started'.format(cycles))
                try:
                    # Get tasks from automator table
                    self._refresh_task_data()

                    # Update meta data status
                    self._status_running()

                    if not self.task_test:
                        # Backup Local Files
                        self.backup_files()

                    # Create Task Objects
                    self._create_task_objects()

                    # Create Task Priorities
                    self._check_priorities()

                    if self.task_test:

                        # Start up requested task
                        self.test_task(self.test_task_id)
                        if not self.test_loop_count \
                                or cycles == self.test_loop_count:
                            break

                    else:
                        if not mode_print:
                            print('Running Automator 3 - MODE: Standard')
                            mode_print = True
                        # Sort Tasks into Lists
                        self._setup_queues()

                        if not self.threads_created:
                            # Create Task Threads
                            self.open_thread_pool()

                        # Run Cycle tasks
                        self._run_cycle_queue()

                        # Setup Task queues and execute all tasks
                        self._run_standard_task_queues()

                        # Update the last run in meta data
                        self._update_last_run()

                        # Update meta data status
                        self._status_sleeping()

                        print('Cycle {} Completed'.format(cycles))

                        # Sleep till next 5 minute interval 12:00, 12:05, etc    
                        self._sleep()

                except Exception as e:
                    raise e

        finally:
            self.dw.close_connection()

1 Ответ

0 голосов
/ 15 января 2019

Проблема заключалась в том, что мой метод передавал предметы в очередь. Я добавил дополнительную проверку, чтобы остановить добавление ненужных элементов в очередь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...