Использование многопроцессорности для обработки очереди наблюдения - PullRequest
0 голосов
/ 08 апреля 2020

Я пытаюсь использовать многопроцессорную обработку для одновременной обработки нескольких сторожевых событий. Я работаю с кодом, который я нашел здесь Python параллельный поток, который использует события очереди Watchdog

Проблема, с которой я столкнулся в приведенном ниже коде, заключается в load_queue() pool.apply_aysync(print_func, (event,)) не t выполнить функцию print_func(), и я не могу понять, почему. Любая помощь будет великолепна.

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
import datetime
import shutil
import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Pool
import threading

Incoming_Data = 'D:/Incoming_Data/'

class RMTNET_File_Handler(FileSystemEventHandler):

    def __init__(self,queue):
        FileSystemEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):

        self.queue.put(event)

    def on_created(self, event): # when file is created

        self.process(event)
        now = datetime.datetime.now()
        print('{0} -- event {1} off the queue...'.format(now.strftime('%Y/%m/%d %H:%M:%S'), event.src_path))



def print_func(event):

    time.sleep(5)
    now = datetime.datetime.now()
    print('{0} -- Pulling {1} off the queue...'.format(now.strftime('%Y/%m/%d %H:%M:%S'), event.path))

def load_queue(watchdog_queue):

    while True:
        if not watchdog_queue.empty():
            event = watchdog_queue.get()
            pool = Pool()
            pool.apply_async(print_func, (event,))
            print('Finish load_queue')
        else:
            time.sleep(1)

if __name__ == '__main__':

    watchdog_queue = Queue()

    observer = Observer()
    event_handler = RMTNET_File_Handler(watchdog_queue)

    # set observer to use created handler in directory
    observer.schedule(event_handler, path=Incoming_Data)
    observer.start()

    worker = threading.Thread(target=load_queue, args=(watchdog_queue,))
    worker.setDaemon(True)
    print('About to start observer')
    worker.start()





    # sleep until keyboard interrupt, then stop + rejoin the observer
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()

observer.join()
...