Должен ли я использовать события, семафоры, блокировки, условия или их комбинации для безопасного выхода из многопоточной программы Python? - PullRequest
0 голосов
/ 07 июня 2018

Я пишу многопоточную программу на python, в которой основной поток и другие потоки, которые он создает, работают как демоны (но не с Thread.daemon = True), которые ищут определенные файлы в определенных каталогах и выполняют с ними действия, когда они существуют.,Возможно, что в одном / любом из потоков возникнет ошибка, которая потребует завершения всей программы.Однако мне нужно, чтобы другие потоки завершили свою текущую работу перед выходом.

Из того, что я понимаю, если я установлю myThread.daemon = True для порожденных мной потоков, они автоматически завершатся сразу после выхода из основного потока.Тем не менее, я хочу, чтобы другие потоки закончили свою текущую работу перед выходом (если ошибка не является своего рода катастрофическим сбоем, в этом случае я, вероятно, просто все равно выйду из системы, безопасно или нет).Поэтому я не устанавливаю для свойства daemon значение True для потоков.

Глядя на документацию модуля потоков и различные доступные мне объекты, такие как события, семафоры, условия и блокировки, я не уверен, что лучше всего справиться со своей ситуацией.Кроме того, я не уверен, как справиться с этим сценарием, когда программа должна завершиться из-за сигналов SIGTERM / SIGINT.

Некоторый код, который иллюстрирует упрощенную версию структуры моей программы:

import threading
import signals
import glob
import time

class MyThread1( threading.thread ):
    def __init__( self, name='MyThread1' ):
        threading.Thread.__init__( self )
        self.name = name
        return
    def run( self ):
        while True:
            filePathList = glob.glob( thisThreadDir + '/*.txt' )
            for file in filePathList:
                try:
                    doSomeProcessing( file )
                    # Then move the file to another thread's dir
                    # or potentially create a new file that will 
                    # be picked up by another thread
                except:
                    # Need to potentially tell all other threads
                    # to finish task and exit depending on error

            # I assume this would be the place to check for some kind of
            # flag or other indication to terminate the thread?
            time.sleep( 30 )


# Now imagine a few more custom threads with the same basic structure, 
# except that what is happening in doSomeProcessing() will obviously vary

# Main Thread/Script
def sigintHandler( SIGINT, frame ):
    # How do I handle telling all threads to finish their current loop
    # and then exit safely when I encounter this signal?
    sys.exit( 1 )

def sigtermHandler( SIGTERM, frame ):
    # Same question for this signal handler
    sys.exit( 1 )

signal.signal( signal.SIGINT, sigintHandler )
signal.signal( signal.SIGTERM, sigtermHandler )

myOtherThread1 = MyThread1()
myOtherThreadN = MyThreadN()

myOtherThread1.start()
myOtherThreadN.start()

while True:
    filePathList = glob.glob( mainDir + '/*.txt' )
    for file in filePathList:
        try:
            doMainProcessing( file )
            # Move file or write a new one in another thread's dir
        except:
            # Again, potentially need to exit the whole program, but want 
            # the other threads to finish their current loop first 

    # Check if another thread told us we need to exit?
    time.sleep( 30 )

1 Ответ

0 голосов
/ 07 июня 2018

Я бы использовал Event , чтобы сообщить потоку, что он должен выйти :

  • создать событие в __init__
  • использоватьwait() в run() для sleep и для проверки, когда выходить
  • установить событие извне, чтобы остановить поток

Для обработки исключений в пределахнить , у меня будет блок try / except вокруг всего, что он делает.Когда что-то перехватывается, сохраните исключение (и / или любую другую необходимую вам информацию), очистите и выйдите из потока.

Снаружи, в основном потоке, проверьте наличие исключений во всех потоках, если таковые имеютсяОбнаружено исключение, дайте сигнал всем потокам, что они должны выйти.

Для обработки исключений в основном потоке (который включает в себя также SIGINT), есть try / exceptзаблокируйте там и подайте сигнал всем потокам об остановке.

Все вместе, с фиктивными исключениями и отладочными отпечатками:

import threading
import time

class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()
        self.stop_requested = threading.Event()
        self.exception = None

    def run(self):
        try:
            # sleep for 1 second, or until stop is requested, stop if stop is requested
            while not self.stop_requested.wait(1):
                # do your thread thing here
                print('in thread {}'.format(self))

                # simulate a random exception:
                import random
                if random.randint(0, 50) == 42:
                    1 / 0
        except Exception as e:
            self.exception = e

        # clean up here
        print('clean up thread {}'.format(self))

    def stop(self):
        # set the event to signal stop
        self.stop_requested.set()

# create and start some threads
threads = [MyThread(), MyThread(), MyThread(), MyThread()]
for t in threads:
    t.start()

# main thread looks at the status of all threads
try:
    while True:
        for t in threads:
            if t.exception:
                # there was an error in a thread - raise it in main thread too
                # this will stop the loop
                raise t.exception
        time.sleep(0.2)

except Exception as e:
    # handle exceptions any way you like, or don't
    # This includes exceptions in main thread as well as those in other threads
    # (because of "raise t.exception" above)
    print(e)

finally:
    print('clan up everything')
    for t in threads:
        # threads will know how to clean up when stopped
        t.stop()
...