Python threading.Event () - Обеспечение пробуждения всех ожидающих потоков на event.set () - PullRequest
10 голосов
/ 05 августа 2010

У меня есть несколько потоков, которые ожидают события, выполняют какое-то действие, а затем снова ждут события.Другой поток вызовет событие, когда это будет уместно.

Я не могу найти способ гарантировать, что каждый ожидающий поток срабатывает ровно один раз после установки события.В настоящее время у меня есть триггерный поток, установил его, немного поспал, затем очистил его.К сожалению, это приводит к тому, что ожидающие потоки захватывают заданное событие много раз или вообще ничего.

Я не могу просто заставить поток запуска порождать потоки ответов, чтобы запустить их один раз, потому что они являются ответами на запросы, сделанные из других мест.

Короче говоря: в Python, как я могу иметьпоток установил событие и удостоверился, что каждый ожидающий поток воздействует на событие ровно один раз до его очистки?

Обновление:

Я пытался настроить его с помощьюблокировка и очередь, но это не работает.Вот что у меня есть:

# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear()    # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()

def waitCall():
    mainLock.acquire()
    waitingOnEvent.put("waiting")
    mainLock.release()
    MainEvent.wait()
    waitingOnEvent.get(False)
    waitingOnEvent.task_done()
    #do stuff
    return

def triggerCall():
    mainLock.acquire()
    itemsinq = waitingOnEvent.qsize()
    MainEvent.set()
    waitingOnEvent.join()
    MainEvent.clear()
    mainLock.release()
    return

В первый раз itemsinq правильно отражает количество ожидающих вызовов, но только первый ожидающий поток, который совершит вызов, пройдет через него.С этого момента itemsinq всегда равен 1, и ожидающие потоки сменяются;каждый раз, когда происходит вызов триггера, происходит следующее.

Обновление 2 Кажется, что только один из потоков event.wait () пробуждается , и все жеqueue.join () работает.Это наводит меня на мысль, что один ожидающий поток просыпается, берет из очереди и вызывает task_done (), и что единственный get () / task_done () каким-то образом очищает очередь и разрешает join ().Затем триггерный поток завершает join (), очищает событие и, таким образом, предотвращает прохождение других ожидающих потоков.Почему очередь регистрируется как пустая / завершенная только после одного вызова get / task_done?

Кажется, что просыпается только один, даже если я закомментирую queue.get () и очередь.task_done () и повесьте триггер, чтобы он не мог очистить событие.

Ответы [ 4 ]

9 голосов
/ 10 августа 2010

Вам не нужно событие, и вам не нужны и замок, и очередь.Все, что вам нужно, это Очередь.

Вызов queue.put, чтобы отправить сообщение, не дожидаясь его доставки или обработки.

Вызов queue.get в рабочем потоке, чтобы дождатьсяприбытие сообщения.

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

Сообщения не обязательно должны быть строками;они могут быть любым объектом Python.

3 голосов
/ 05 августа 2010

Если вам нужны дискретные атомарные события, которые могут обрабатываться последовательно каждым потоком, то сделайте так, как предложили krs1 & bot403, и используйте очередь. Класс Python Queue синхронизирован - вам не нужно беспокоиться о блокировке, чтобы использовать его.

Если, однако, ваши потребности проще (событие говорит вам, что у вас есть данные для чтения и т. Д.), Вы можете подписать / зарегистрировать свои потоки в качестве наблюдателей объекта, ответственного за запуск событий. Этот объект будет вести список объектов-наблюдателей threading.Event. Для триггера он может затем вызвать set () для всех threading.Event объектов в списке.

2 голосов
/ 05 августа 2010

Одним из решений, которое я использовал в прошлом, является класс Queue для связи между потоками.Он является поточно-ориентированным и может использоваться для облегчения связи между потоками при использовании как многопроцессорных, так и многопоточных библиотекВы могли бы иметь дочерние потоки, ожидающие чего-то, чтобы войти в очередь и затем обработать новую запись.Класс Queue также имеет метод get (), который принимает удобный аргумент блокировки.

1 голос
/ 05 августа 2010

Я не программист на Python, но если событие может быть обработано только один раз, возможно, вам нужно переключиться на очередь сообщений с соответствующей блокировкой, чтобы при пробуждении одного потока и получении сообщения о событии он обрабатывал и удалялэто из очереди, поэтому его нет, если другие потоки проснутся и заглянут в очередь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...