Заблокируйте один поток R, когда работает любой поток из набора {B} - PullRequest
0 голосов
/ 29 января 2019

Мое приложение имеет один долго работающий, загруженный ЦП поток, (примером которого является класс Runner), в то время как пул потоков должен быстро построить страницу и затем вернуть ее (например, класс Builder).Runner выполняет фоновое кодирование мультимедиа и не зависит от времени.Builder.build_stuff() работает быстро и чувствителен ко времени, поэтому я хочу заблокировать Runner от запуска любой новой работы, если Builder пытается запустить.

Я изначально использовал один threading.Lock() для блокировки Runner.do_work но это ограничивало несколько Builder s от запуска в одно время.Я сделал упрощенный пример ниже, чтобы показать стратегию, которую я придумал;но я не уверен на 100%, что это лучшее решение или что для этого не существует какой-либо конструкции.

import threading
runner = Runner()

"""Thread that is always running"""
def run():
    while True:
        runner.do_work()
running_work = threading.Thread(target=run).start()

"""Builders are created for a short duration by an outside pool of threads"""
class Builder:
    def __del__(self):
        runner.outside_release()

    def __init__(self):
        runner.outside_acquire()
        self.build_stuff()

    def build_stuff(self):
        """Do build some stuff here"""
        return """thing that was built"""

class Runner:
    def __init__(self):
        self.building_flag = 0
        self.building_lock = threading.Condition(threading.Lock())

    def outside_acquire(self):
        with runner.building_lock:
            runner.building_flag += 1
        self.build_stuff()

    def outside_release(self):
        with runner.building_lock:
            runner.building_flag -= 1
            runner.building_lock.notify()

    def do_work(self):
        with self.building_lock:
            while self.building_flag:
                self.building_lock.wait()
        """Do some work here"""

1 Ответ

0 голосов
/ 30 января 2019

Прежде всего, вы уже упомянули об этом, но я скажу это снова для всех, кто читает это: прерывать процесс в долгосрочном потоке и перезапускать его гораздо сложнее, чем разделять длительный процесс на куски и проверятьесли вы должны делать паузу так часто.

Я нашел ваш пример немного запутанным и неполным, поэтому я сделал отдельный пример, демонстрирующий, как я бы решил проблему, как описано.Я использовал простой список, защищенный threading.Lock, чтобы отслеживать всех работающих сборщиков, и threading.Event, чтобы уведомить бегуна о том, когда следует остановиться и возобновить работу.Наиболее важные разделы находятся до и после «работы» в методе run строителя, где он добавляет и удаляет себя в списке работающих строителей, а также определяет, было ли это последним завершением работы, о котором уведомляется бегун.чтобы начать снова.

from threading import Thread, Lock, Event
from time import sleep

class Builder(Thread):

    running = [] #list to keep track of any running builders
    running_lock = Lock() #mutex for running builder list

    def __init__(self, work, can_work_event):
        super().__init__()
        self.work = work
        self.runner_can_work = can_work_event


    def run(self):
        #before we do our work, add ourselves to running list and tell runner not to work
        self.runner_can_work.clear() #runner cannot start new work now
        with Builder.running_lock: #append and remove are not likely thread safe
            Builder.running.append(self) #add self to running builder list

        print(f'builder doing {self.work}')
        sleep(1)

        #this is not robust against builders crashing. Perhaps a better solution would
        #  keep track of thread id's, and periodically clean out id's of crashed threads
        #  from the Builder.running list

        #del isn't a reliable way ot determine when a task is done. Do this at the end
        #  of the work you intend to perform.

        with Builder.running_lock: #when builder is done with work
            Builder.running.remove(self) #remove self from list
            if not Builder.running: #no more builders are in the list
                self.runner_can_work.set() #allow runner to begin new work

class Runner(Thread):

    def __init__(self, work_to_do, can_work_event):
        super().__init__()
        self.work = work_to_do
        self.can_work = can_work_event

    def run(self):
        for chunk in self.work:
            self.can_work.wait() #wait on any running Builders
            print(f'runner working on {chunk}')
            sleep(2) #heavy computation

#example demonstration

runner_can_work = Event() #event used to notify runner of ability to work
runner_can_work.set() #set event to initially true (default is false)

r = Runner(range(10), runner_can_work) # range = dummy work to do
b1 = Builder('work 1', runner_can_work)
b2 = Builder('work 2', runner_can_work)
b3 = Builder('work 3', runner_can_work)
b4 = Builder('work 4', runner_can_work)
b5 = Builder('work 5', runner_can_work)

r.start()
sleep(3)
b1.start()
sleep(4)
b2.start()
b3.start()
sleep(3)
b4.start()
b5.start()

for t in (r,b1,b2,b3,b4,b5): t.join()
print('done')
...