Как корректно завершить многопоточное приложение Python, использующее queue.Queue - PullRequest
0 голосов
/ 04 июля 2018

Я уже давно пытаюсь изящно завершить свою заявку, но пока ни один из найденных ответов не сработал.

Пример кода ниже иллюстрирует структуру моего приложения. По сути, это цепочка потоков, которая передает данные друг другу с помощью очередей.

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys


class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()


    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three


    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.some_object_three.queue.put(inverted)
        sleep(2)


class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        sleep(2)




class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()
        sys.exit(0)

Теперь, если я запускаю этот код и Ctrl-C для завершения, thread_one, кажется, присоединяется, как и ожидалось, но код застревает на thread_two.join().

Поскольку thread_one - единственный поток с непрерывной пустой очередью, я ожидаю, что он как-то связан с очередью.

Есть идеи?

1 Ответ

0 голосов
/ 04 июля 2018

В run() методе StoppableThread у вас есть это:

self.queue.join()

join() - это метод блокировки :

Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается при каждом добавлении элемента в очередь. Счетчик уменьшается, когда пользовательский поток вызывает task_done (), чтобы указать, что элемент был извлечен и все работают над ним завершено. Когда количество незавершенных задач падает до нуля, join () разблокирует.

Таким образом, для возврата join() недостаточно get() элемента в другом потоке, вы также должны указать, что он был обработан с помощью task_done():

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys

class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()

    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.queue.task_done()
        self.some_object_three.queue.put(inverted)
        sleep(2)

class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        self.queue.task_done()
        sleep(2)

class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...