Как я могу написать модульные тесты для классов, которые работают до прерывания клавиатуры и ничего не возвращают? - PullRequest
0 голосов
/ 05 июля 2019

Я работаю над сервисом Python, который отслеживает каталог в файловой системе.Когда он видит, что файл был создан или перемещен туда, он отправляет путь к файлу в очередь Kafka.У меня служба работает точно так, как мне нужно, но моя проблема в том, что у меня должно быть как минимум 90% покрытия модульными тестами.Я относительно новичок в Python, и я никогда раньше не использовал модульное тестирование ни на одном языке, поэтому я чувствую себя действительно не в своей тарелке.Я просто не могу обдумать, как я буду тестировать эти классы.

Это класс, который контролирует файловую систему, я использую библиотеку watchdog .

Я добавил параметр handler=FileHandler в init , потому что решил, что могу использовать его для передачи классу фальшивого обработчика, который я мог бы использовать для тестов, но кажется, что он излишне сложен.

class FileSystemMonitor:

    def __init__(self, target_path, kafka_queue, handler=FileHandler):
        self.path = target_path
        self.queue = kafka_queue
        self.handler = handler(self.queue)

    def start(self):
        observer = Observer()
        observer.schedule(self.handler, self.path, recursive=True)
        observer.start()
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

def parse_args():
    path = sys.argv[1] if len(sys.argv) > 1 else '.'
    queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
    return path, queue

if __name__ == "__main__":
    path, queue = parse_args()
    monitor = FileSystemMonitor(path, queue)
    monitor.start()

Это класс, который я создал, который обрабатывает события, генерируемые монитором, и передает путь в очередь Kafka.

class FileHandler(PatternMatchingEventHandler):

    def __init__(self, queue):
        super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
        self.queue = queue

    def on_any_event(self, event):
        super(FileHandler, self).on_any_event(event)
        #print(event, self.queue)
        result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
        print("Handler:", result)
        return result

Я написал несколько тестов для класса kafkaProducer, и я не сделалс этим очень трудно, потому что он на самом деле возвращает значение, которое я мог бы протестировать.

FileSystemMonitor работает бесконечно и просто ждет прерывания клавиатуры, а когда он заканчивается, он ничего не возвращаетИтак, как мне написать модульные тесты для него?

Что касается класса FileHandler, то он зависит от событий, запускаемых классом монитора, так как же мне выделить класс Handler для его проверки?

1 Ответ

2 голосов
/ 08 июля 2019

FileSystemMonitor.start очень трудно проверить, поскольку он блокируется до тех пор, пока не произойдет внешнее событие, но тест не может легко вызвать событие из-за блокировки. Я полагаю, что вы могли бы сделать несколько трюков с многопоточностью или многопроцессорностью или, может быть, просто с помощью таймера, но это добавило бы неопределенности в ваш тест, который мне не нравится.

Более явный подход - позволить вызывающей стороне указать, что происходит внутри цикла while, чтобы исключение могло быть вызвано в тесте, пока в производственном коде будет вызываться time.sleep.

class FileSystemMonitor:
    def __init__(self, target_path, kafka_queue, handler=FileHandler):
        self.path = target_path
        self.queue = kafka_queue
        self.handler = handler(self.queue)

    def start(self, loop_action):
        observer = Observer()
        observer.schedule(self.handler, self.path, recursive=True)
        observer.start()
        try:
            while True:
                loop_action()
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

Вот как будет выглядеть ваш тест:

def fake_loop_action():
    raise KeyboardInterrupt

def test_FileSystemMonitor():
    # Initialize target_path, kafka_queue and handler here.
    # You might want to use test doubles.
    monitor = FileSystemMonitor(target_path, kafka_queue, handler)
    monitor.start(loop_action=fake_loop_action)

А в рабочем коде вы бы использовали time.sleep вместо этого. Вы даже можете указать задержку вызова сейчас.

monitor.start(loop_action=lambda: time.sleep(1))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...