Рекурсивно проверять массив на наличие новых элементов и запускать действие - PullRequest
0 голосов
/ 14 января 2019

Я запускаю скрипт, который генерирует файлы каждые 60 секунд, которые я затем сохраняю в массиве:

my_files = [file1, file2, ..., filen]

Поскольку этот массив продолжает динамически расти, мне нужно рекурсивно продолжать проверять наличие новых файлов. Каждый раз, когда появляются новые файлы, мне нужно добавить их в «очередь» и ждать, пока они будут обработаны путем вызова других функций.

Каков наилучший способ сделать это? Я изучил сторожевой таймер, но он кажется очень сложным для того, что мне нужно Я также попытался использовать следующий класс, который позволит мне рекурсивно проверять мою папку, но затем я не могу перебирать файлы, которые «выбираются», потому что это не list:

class RepeatedTimer(object):

    def __init__(self, interval, function, *args, **kwargs):
        self._timer = None
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.next_call = time.time()
        self.start()

     def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

     def start(self):
         if not self.is_running:
             self.next_call += self.interval
             self._timer = threading.Timer(self.next_call - time.time(), self._run)
             self._timer.start()
             self.is_running = True

     def stop(self):
         self._timer.cancel()
         self.is_running = False

Следующее не будет работать, поэтому я не могу получить доступ к файлам:

my_files = [RepeatedTimer(30, pick_testing_files)]

while True:
    if len(my_files) > 0:
        files_to_be_processed = my_files.pop()
        threading.Thread(target=test, args=(files_to_be_processed)).start()

1 Ответ

0 голосов
/ 14 января 2019

Кажется, что-то подобное работает (насколько я могу судить по моему ограниченному тестированию). Я добавил threading.Lock для защиты общего ресурса my_files от одновременного доступа.

import random
import string
import threading
import time


class RepeatedTimer:
    def __init__(self, interval, function, *args, **kwargs):
        self._timer = None
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.next_call = time.time()
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self.next_call += self.interval
            self._timer = threading.Timer(self.next_call - time.time(), self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False


def pick_testing_files(filenames, lock):
    newfiles = []
    with lock:
        while True:
            try:
                filename = filenames.pop()
            except IndexError:  # List is empty.
                break
            newfiles.append(filename)

    print('files retrieved:', newfiles)

def random_filename():
    letters = []
    for _ in range(random.randint(4, 8)):
        letters.append(random.choice(string.ascii_lowercase))
    return ''.join(letters)


my_files = []
filelist_lock = threading.Lock()

watcher = RepeatedTimer(5, pick_testing_files, my_files, filelist_lock)
#watcher.start()  # Not needed because RepeatedTimer's start themselves.

for _ in range(30):  # Run test for 30 seconds.
    # Add some filenames to my_files list.
    with filelist_lock:
        for _ in range(random.randint(1, 4)):  # Generate some filenames.
            my_files.append(random_filename())
    print('new files added')
    time.sleep(1)  # Wait a little before adding more filenames.

watcher.cancel()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...