условная блокировка python - PullRequest
0 голосов
/ 09 марта 2009

Как реализовать условную блокировку в многопоточном приложении, например, у меня есть? 30 потоков, которые вызывают функцию, и в большинстве случаев доступ ко всем потокам возможен одновременно, но в зависимости от входных данных функции может быть условие, когда только один поток может сделать это одним действием. (Если значение для ввода повторяется и какой-то поток все еще работает, тогда мне нужна блокировка.)

Я теперь, когда есть потоковая работа модуля с Rlock (), но я не знаю, как использовать его так, как я описал в первой части.

Редактировать : На самом деле вопрос заключается в том, как запретить двум любым потокам запускать одну и ту же функцию с одним и тем же аргументом одновременно. (Спасибо Дэвиду за помощь в формулировании моего вопроса :))

Ответы [ 2 ]

5 голосов
/ 09 марта 2009

Попробуйте: запишите модуль в том месте, где находится ваша функция, и, если вход в функцию таков, что требуется блокировка, получите блокировку внутри функции. В противном случае не надо.

l = threading.RLock()

def fn(arg):
    if arg == arg_that_needs_lock:
        l.acquire()
        try:
            # do stuff
        finally:
            l.release()
    else:
        # do other stuff

EDIT

Насколько я могу судить сейчас, вопрос заключается в том, как запретить двум любым потокам одновременно выполнять одну и ту же функцию с одним и тем же аргументом. Однако нет проблем с двумя потоками, выполняющими одну и ту же функцию с разными аргументами одновременно. Простой способ сделать это, если все допустимые аргументы функции могут быть словарными ключами, - создать словарь аргументов для блокировок:

import threading

dict_lock = threading.RLock()
locks = {}

def fn_dict(arg):
    dict_lock.acquire()
    try:
        if arg not in dict:
            locks[arg] = threading.RLock()
        l = locks[arg]
    finally:
        dict_lock.release()
    l.acquire()
    try:
        # do stuff
    finally:
        l.release()

Если ваша функция может быть вызвана с множеством разных аргументов, это будет означать много блокировок. Вероятно, лучшим способом было бы иметь набор всех аргументов, с которыми в данный момент выполняется функция, и защитить содержимое этого набора с помощью блокировки. Я думаю, что это должно работать:

set_condition = threading.Condition()
current_args = set()

def fn_set(arg):
    set_condition.acquire()
    try:
        while arg in current_args:
            set_condition.wait()
        current_args.add(arg)
    finally:
        set_condition.release()
    # do stuff
    set_condition.acquire()
    try:
        current_args.remove(arg)
        set_condition.notifyAll()
    finally:
        set_condition.release()
1 голос
/ 09 марта 2009

Звучит так, будто вы хотите что-то похожее на блокировку Readers-Writer .

Это, вероятно, не то, что вы хотите, но может быть подсказкой:

from __future__ import with_statement
import threading

def RWLock(readers = 1, writers = 1):
    m = _Monitor(readers, writers)
    return (_RWLock(m.r_increment, m.r_decrement), _RWLock(m.w_increment, m.w_decrement))

class _RWLock(object):
    def __init__(self, inc, dec):
        self.inc = inc
        self.dec = dec

    def acquire(self):
        self.inc()
    def release(self):
        self.dec()
    def __enter__(self):
        self.inc()
    def __exit__(self):
        self.dec()

class _Monitor(object):
    def __init__(self, max_readers, max_writers):
        self.max_readers = max_readers
        self.max_writers = max_writers
        self.readers = 0
        self.writers = 0
        self.monitor = threading.Condition()

    def r_increment(self):
        with self.monitor:
            while self.writers > 0 and self.readers < self.max_readers:
                self.monitor.wait()
            self.readers += 1
            self.monitor.notify()

    def r_decrement(self):
        with self.monitor:
            while self.writers > 0:
                self.monitor.wait()
            assert(self.readers > 0)
            self.readers -= 1
            self.monitor.notify()

    def w_increment(self):
        with self.monitor:
            while self.readers > 0 and self.writers < self.max_writers:
                self.monitor.wait()
            self.writers += 1
            self.monitor.notify()

    def w_decrement(self):
        with self.monitor:
            assert(self.writers > 0)
            self.writers -= 1
            self.monitor.notify()

if __name__ == '__main__':

    rl, wl = RWLock()
    wl.acquire()
    wl.release()
    rl.acquire()
    rl.release()

(к сожалению, не проверено)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...