Передать очередь для сторожевого потока в python - PullRequest
0 голосов
/ 06 марта 2019

У меня есть класс Indexer, который создается из основного потока, экземпляр этого класса хранится в переменной, скажем, indexer. watchdog.observers.Observer() наблюдает за изменениями в каталогах, которые происходят в другом потоке. Я попытался передать эту переменную indexer из основного потока через мой обработчик Vigilante, который был передан в ob.schedule(Vigilante(indexer)) вместе с некоторыми другими переменными из основного потока. Я не могу получить доступ к переменной indexer в Vigilante class из-за нахождения в разных потоках. Я знаю, что могу использовать Queue, но я не знаю, как передать Queue нити сторожевого таймера.

Вот код из основного потока:

if watch:
    import watchdog.observers
    from .utils import vigilante
    class callbacks:
        def __init__(self):
            pass
        @staticmethod
        def build(filename, response):
            return _build(filename, response)
        @staticmethod
        def renderer(src, mode):
            return render(src, mode)
    handler = vigilante.Vigilante(_filter, ignore, Indexer, callbacks, Mode)
    path_to_watch = os.path.normpath(os.path.join(workspace, '..'))
    ob = watchdog.observers.Observer()
    ob.schedule(handler, path=path_to_watch, recursive=True)
    ob.start()
    import time
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        ob.stop()
        Indexer.close()
    ob.join()

Indexer class предназначен для записи в базу данных из другой части кода, где был создан экземпляр Indexer.

Вот код из Vigilante класса, работающего в потоке сторожевого таймера:

class Vigilante(PatternMatchingEventHandler):
    """Helps to watch files, directories for changes"""
    def __init__(self, pattern, ignore, indexer, callback, mode):
        pattern.append("*.yml")
        self.Callback = callback
        self.Mode = mode
        self.Indexer = indexer
        super(Vigilante, self).__init__(patterns=pattern, ignore_directories=ignore)

    def vigil(self, event):
        print(event.src_path, 'modified')
        IndexReader = self.Indexer.get_index_on(event.src_path)
        dep = IndexReader.read_index()
        print(dep.next(), 'dependency')
        feedout = self.Callback.build(
            os.path.basename(event.src_path)
            ,self.Callback.renderer(event.src_path, self.Mode.FILE_MODE)
        )

    def on_modified(self, event):
        self.vigil(event)

    def on_created(self, event):
        self.vigil(event)

Все, что мне нужно, - это способ передать эти переменные из основного потока в поток сторожевого таймера через класс Vigilante

Ответы [ 2 ]

0 голосов
/ 07 марта 2019

Я наконец нашел способ сделать это, не пересекая потоки, как прежде, с идеей, основанной на ответе @EvertW. Я передал Queue из основного потока в класс Vigilante, который находился в другом потоке, поэтому каждый измененный файл будет помещен в Queue, а затем из основного потока я получил измененный файл из очереди, считывание из базы данных индексатора, и все другие задачи, которые необходимо выполнить методом Vigilante.vigil, были перемещены в основной поток, поскольку эти задачи зависят от измененного файла и того, что читается из базы данных индексатора.

Эта ошибка исчезла:

Объекты SQLite, созданные в потоке, могут использоваться только в том же потоке. Объект был создан в идентификаторе потока 9788, а это идентификатор потока 4288.

Вот фрагмент из того, что я сделал:

....
q = Queue.LifoQueue(10)
handler = vigilante.Vigilante(q, _filter, ignore)
path_to_watch = os.path.normpath(os.path.join(workspace, '..'))
ob = watchdog.observers.Observer()
ob.schedule(handler, path=path_to_watch, recursive=True)
ob.start()
import time
try:
    while True:
        if not q.empty():
            modified = q.get()
            IndexReader = Indexer.get_index_on(modified)
            deps = IndexReader.read_index()
            print(deps.next(), 'dependency')
            # TODO
        else:
            print('q is empty')
            time.sleep(1)
except KeyboardInterrupt:
    ob.stop()
    Indexer.close()
ob.join()

Класс Виджиланте:

class Vigilante(PatternMatchingEventHandler):
    """Helps to watch files, directories for changes"""
    def __init__(self, q, pattern, ignore):
        self.q = q
        super(Vigilante, self).__init__(
            patterns=pattern, 
            ignore_patterns=ignore, 
            ignore_directories=True
        )

    def vigil(self, event):
        print(event.src_path, 'modified')
        self.q.put(event.src_path)

    def on_modified(self, event):
        self.vigil(event)

    def on_created(self, event):
        self.vigil(event)
....

PS: Слово совета: мое слово совету тому, кто сталкивался с подобными проблемами с потоками в watchdog; «Не доверяйте потоку сторожевого таймера выполнять задачи с измененными файлами, просто извлеките измененные файлы и делайте с ними все, что угодно, кроме простой задачи».

0 голосов
/ 06 марта 2019

Вы можете попробовать шаблон Observer (без каламбура), т. Е. Позволить классу Observer иметь список слушателей, которые он будет информировать о любых изменениях, которые он увидит. Затем позвольте индексатору объявить о своей заинтересованности в Observer.

В моем примере Observer ожидает, что подписчики будут вызываться, которые получают изменения. Тогда вы можете сделать:

from queue import Queue
class Observable:
    def __init__(self):
        self.listeners = []
    def subscribe(listener):
        self.listeners.append(listener)
    def onNotify(change):
        for listener in self.listeners:
            listener(change)

class Indexer(Thread):
    def __init__(self, observer):
        Thread.__init__(self)
        self.q = Queue()
        observer.subscribe(self.q.put)
    def run(self):
        while True:
            change = self.q.get()

Поскольку стандартная очередь полностью поточнобезопасна, это будет нормально работать.

...