Python параллельный поток, использующий события очереди Watchdog - PullRequest
1 голос
/ 11 марта 2020

У меня есть этот код, который должен помещать событие в очередь каждый раз, когда внешняя программа (TCPdump) создает файл * .pcap в моем каталоге. Моя проблема в том, что я всегда получаю пустую очередь, хотя я получаю функцию print из process ().

Что я делаю не так? Правильно ли определена очередь и разделена ли она между двумя классами?

РЕДАКТИРОВАТЬ -----------------
Возможно, я понял, почему получил пустую очередь, я думаю, это потому, что я печатаю очередь, которую я инициализировал, прежде чем она будет заполнена классом Handler. Я изменил свой код и создал два процесса, которые должны использовать одну и ту же очередь, но теперь выполнение зависает на queue.put () и поток ReadPcapFiles () перестает работать.

Здесь обновленный код:

import time
import pyshark
import concurrent.futures
import threading
import logging
from queue import Queue
from multiprocessing import Process
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["*.pcap", "*.pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        #print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)
        logging.info(f"Storing message: {self.queue.qsize()}")
        print("Producer queue: ", list(self.queue.queue))
        #self.queue.get()

    def on_created(self, event):
        self.process(event)          


def StartWatcher(watchdogq, event):
    path = 'C:\\...'
    handler = Handler(watchdogq)
    observer = Observer()
    while not event.is_set():
        observer.schedule(handler, path, recursive=False)
        print("About to start observer")
        observer.start()
        try:
            while True:
                time.sleep(1)
        except Exception as error:
            observer.stop()
            print("Error: " + str(error))
        observer.join()


def ReadPcapFiles(consumerq, event):
    while not event.is_set() or not consumerq.empty():
        print("Consumer queue: ", consumerq.get())
        #print("Consumer queue: ", list(consumerq.queue))

    # pcapfile = pyshark.FileCapture(self.queue.get())
    #     for packet in pcapfile:
    #         countPacket +=1 

if __name__ == '__main__':
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
    logging.getLogger().setLevel(logging.DEBUG)

    queue = Queue()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(StartWatcher,queue, event)
        executor.submit(ReadPcapFiles,queue, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

СТАРЫЙ КОД:

import time
from queue import Queue
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler

class Handler(PatternMatchingEventHandler):
    patterns = ["*.pcap", "*.pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):
        print(f'event type: {event.event_type}  path : {event.src_path}')   
        self.queue.put(event.src_path)

    def on_created(self, event):
        self.process(event)

class Watcher():
    def __init__(self, path):
        self.queue = Queue()
        self.observer = Observer()
        self.handler = Handler(self.queue)
        self.path = path

    def start(self): 
        self.observer.schedule(self.handler, self.path, recursive=True)
        self.observer.start()
        try:
            while True:
                time.sleep(1)
                self.queue.get()
                print(list(self.queue.queue))
        except Exception as error:
            self.observer.stop()
            print("Error: " + str(error))
        self.observer.join()  

if __name__ == '__main__':
    watcher = Watcher('C:\\...')
    watcher.start()

Ответы [ 2 ]

1 голос
/ 19 марта 2020

Это работает для меня (я понял основную идею из этого ответа , спасибо!), Но обратите внимание, что я считаю это обходным путем, поэтому, если у кого-то есть лучшее решение этого вопроса или он может лучше объяснить Причина такого поведения в Python, пожалуйста, не стесняйтесь ответить!

Я предполагаю, что у меня были две основные проблемы:
- я запускал процесс Watchdog внутри другого потока (и это как-то блокировало мой поток, потребляющий очередь).
- Python многопоточность на самом деле не работает параллельно и поэтому запуск независимого процесса был необходим.

Вот мой код:

import time
import pyshark
import threading
import logging
import os
from queue import Queue
from multiprocessing import Process, Pool
from watchdog.observers import Observer, api
from watchdog.events import PatternMatchingEventHandler
from concurrent.futures import ThreadPoolExecutor

class Handler(PatternMatchingEventHandler):
    patterns = ["*.pcap", "*.pcapng"]

    def __init__(self, queue):
        PatternMatchingEventHandler.__init__(self)
        self.queue = queue

    def process(self, event):  
        self.queue.put(event.src_path)
        logging.info(f"Storing message: {self.queue.qsize()}")
        print("Producer queue: ", list(self.queue.queue))


    def on_created(self, event):
        #wait that the transfer of the file is finished before processing it
        file_size = -1
        while file_size != os.path.getsize(event.src_path):
            file_size = os.path.getsize(event.src_path)
            time.sleep(1)

        self.process(event)         

def ConsumeQueue(consumerq):
    while True:
        if not consumerq.empty(): 
            pool = Pool()
            pool.apply_async(ReadPcapFiles, (consumerq.get(), ))
        else:    
            time.sleep(1)

def ReadPcapFiles(get_event):        
    createdFile = get_event
    print(f"This is my event in ReadPacapFile {createdFile}")

    countPacket = 0
    bandwidth = 0
    pcapfile = pyshark.FileCapture(createdFile)
    for packet in pcapfile:
        countPacket +=1
        bandwidth = bandwidth + int(packet.length)
    print(f"Packet nr {countPacket}")
    print(f"Byte per second {bandwidth}")


if __name__ == '__main__':

    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
    logging.getLogger().setLevel(logging.DEBUG)

    queue = Queue()
    path = 'C:\\...'

    worker = threading.Thread(target=ConsumeQueue, args=(queue, ), daemon=True)
    print("About to start worker")
    worker.start()

    event_handler = Handler(queue)
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    print("About to start observer")
    observer.start()

    try:
        while True:
            time.sleep(1)
    except Exception as error:
        observer.stop()
        print("Error: " + str(error))
    observer.join()
0 голосов
/ 19 марта 2020

Существует отличная библиотека, которая обеспечивает одновременный доступ к элементам в этой очереди. Очередь также является постоянной (как на основе файлов, так и на основе базы данных), поэтому, если программа аварийно завершает работу, вы все равно можете использовать события из точки, в которой произошел сбой программы.

persist-queue

...