Почему моя многопроцессорная очередь не является поточно-ориентированной? - PullRequest
0 голосов
/ 09 января 2019

Я создаю сторожевой таймер, который запускает другую программу на Python и, если ему не удается найти регистрацию в каком-либо из потоков, отключает всю программу. Это так, что в конечном итоге он сможет взять под контроль необходимые коммуникационные порты. Код для таймера выглядит следующим образом:

from multiprocessing import Process, Queue
from time import sleep
from copy import deepcopy

PATH_TO_FILE = r'.\test_program.py'
WATCHDOG_TIMEOUT = 2

class Watchdog:

    def __init__(self, filepath, timeout):
        self.filepath = filepath
        self.timeout = timeout
        self.threadIdQ = Queue()
        self.knownThreads = {}

    def start(self):
        threadIdQ = self.threadIdQ

        process = Process(target = self._executeFile)
        process.start()
        try:
            while True:
                unaccountedThreads = deepcopy(self.knownThreads)

                # Empty queue since last wake. Add new thread IDs to knownThreads, and account for all known thread IDs
                # in queue
                while not threadIdQ.empty():
                    threadId = threadIdQ.get()
                    if threadId in self.knownThreads:
                        unaccountedThreads.pop(threadId, None)
                    else:
                        print('New threadId < {} > discovered'.format(threadId))
                        self.knownThreads[threadId] = False

                # If there is a known thread that is unaccounted for, then it has either hung or crashed.
                # Shut everything down.
                if len(unaccountedThreads) > 0:
                    print('The following threads are unaccounted for:\n')
                    for threadId in unaccountedThreads:
                        print(threadId)
                    print('\nShutting down!!!')
                    break
                else:
                    print('No unaccounted threads...')

                sleep(self.timeout)

        # Account for any exceptions thrown in the watchdog timer itself
        except:
            process.terminate()
            raise

        process.terminate()


    def _executeFile(self):
        with open(self.filepath, 'r') as f:
            exec(f.read(), {'wdQueue' : self.threadIdQ})

if __name__ == '__main__':
    wd = Watchdog(PATH_TO_FILE, WATCHDOG_TIMEOUT)
    wd.start()

У меня также есть небольшая программа для проверки работоспособности сторожевого таймера

from time import sleep
from threading import Thread
from queue import SimpleQueue

Q_TO_Q_DELAY = 0.013

class QToQ:

    def __init__(self, processQueue, threadQueue):
        self.processQueue = processQueue
        self.threadQueue = threadQueue
        Thread(name='queueToQueue', target=self._run).start()

    def _run(self):
        pQ = self.processQueue
        tQ = self.threadQueue
        while True:
            while not tQ.empty():
                sleep(Q_TO_Q_DELAY)
                pQ.put(tQ.get())

def fastThread(q):
    while True:
        print('Fast thread, checking in!')
        q.put('fastID')
        sleep(0.5)

def slowThread(q):
    while True:
        print('Slow thread, checking in...')
        q.put('slowID')
        sleep(1.5)

def hangThread(q):
    print('Hanging thread, checked in')
    q.put('hangID')
    while True:
        pass

print('Hello! I am a program that spawns threads!\n\n')

threadQ = SimpleQueue()

Thread(name='fastThread', target=fastThread, args=(threadQ,)).start()
Thread(name='slowThread', target=slowThread, args=(threadQ,)).start()
Thread(name='hangThread', target=hangThread, args=(threadQ,)).start()

QToQ(wdQueue, threadQ)

Как видите, мне нужно поместить потоки в очередь. Очередь, в то время как отдельный объект медленно подает выходные данные очереди. Очередь в многопроцессорную очередь. Если вместо этого у меня есть потоки, помещенные непосредственно в многопроцессорную очередь, или у меня нет промежуточного состояния объекта QToQ, то многопроцессорная очередь будет заблокирована и всегда будет пустой на стороне сторожевого таймера.

Теперь, поскольку многопроцессорная очередь должна быть поточно-ориентированной и безопасной для обработки, я могу только предположить, что в реализации что-то испортилось. Мое решение, кажется, работает, но также кажется достаточно хакерским, и я чувствую, что должен это исправить.

Я использую Python 3.7.2, если это имеет значение.

1 Ответ

0 голосов
/ 10 января 2019

Я подозреваю, что test_program.py выходит.

Я изменил последние несколько строк на это:

tq = threadQ
# tq = wdQueue    # option to send messages direct to WD

t1 = Thread(name='fastThread', target=fastThread, args=(tq,))
t2 = Thread(name='slowThread', target=slowThread, args=(tq,))
t3 = Thread(name='hangThread', target=hangThread, args=(tq,))

t1.start()
t2.start()
t3.start()
QToQ(wdQueue, threadQ)

print('Joining with threads...')
t1.join()
t2.join()
t3.join()

print('test_program exit')

Вызов join() означает, что тестовая программа никогда не завершится сама по себе, поскольку ни один из потоков никогда не завершится.

Итак, как есть, t3 зависает, и сторожевая программа обнаруживает это и обнаруживает неучтенный поток и останавливает тестовую программу.

Если t3 удалено из вышеприведенной программы, то два других потока хорошо себя ведут, и сторожевая программа позволяет тестовой программе продолжаться бесконечно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...