Класс threading.Lock
предлагает (не документированный) метод .locked()
, который возвращает True
, если блокировка заблокирована, и False
, если нет.Теперь я хочу написать код, подобный этому:
Тема 1:
def run():
while True:
with mylock:
dostuff()
def configure_dostuff(args):
assert mylock.locked()
changeconfig_of_dostuff_which_cant_happen_when_it_runs(args)
Теперь Тема 2 должна делать:
with mylock:
configure_dostuff(args)
Итакэто может безопасно изменить то, что делает Поток 1.Однако .locked()
никогда не проверяет , у кого есть блокировка, поэтому, если поток 2 вызывает configure_dostuff()
без получения блокировки (из-за ошибки программирования), утверждение может не вызвать ошибку, потому что в настоящий момент поток1 имеет блокировку.
AFAIK Нет метода Lock.haslock()
, который возвращает вас, только если вызывающий поток имеет блокировку.Любая идея о том, как реализовать это потокобезопасным способом?