Очистка репо Python - PullRequest
0 голосов
/ 29 мая 2019

У меня есть сценарий, который я почти на 100% выполнил, но есть еще один шаг, который я не могу понять.Мой сценарий в настоящее время проверяет место назначения, чтобы увидеть, существует ли файл, и если он существует, то файл из исходного местоположения не перемещается.Проблема, с которой я сталкиваюсь, состоит в том, что код не проверяет все подкаталоги, а также только корневой каталог.

Я использую os.walk, чтобы просмотреть все файлы в исходной папке, но яне уверен, как os.walk папка назначения и исходная папка в сочетании друг с другом.

import time
import sys
import logging
import logging.config


def main():
    purge_files

def move_files(src_file):

    try:
        #Attempt to move files to dest
        shutil.move(src_file, dest)
        #making use of the OSError exception instead of FileExistsError due to older version of python not contaning that exception 
    except OSError as e:
        #Log the files that have not been moved to the console
        logging.info(f'Files File already exists: {src_file}')
        print(f'File already exists: {src_file}')
        #os.remove to delete files that are already in dest repo
        os.remove(src_file)
        logging.warning(f'Deleting: {src_file}')

def file_loop(files, root):

    for file in files:
        #src_file is used to get the full path of everyfile
        src_file = os.path.join(root,file)

        #The two variables below are used to get the files creation date
        t = os.stat(src_file)
        c = t.st_ctime
        #If the file is older then cutoff code within the if statement executes

        if c<cutoff:

            move_files(src_file)
        #Log the file names that are not older then the cutoff and continue loop
        else:
            logging.info(f'File is not older than 14 days: {src_file}')
            continue

def purge_files():

    logging.info('invoke purge_files method')
    #Walk through root directory and all subdirectories
       for root, subdirs, files in os.walk(source):
          dst_dir = root.replace(source, dest)

           #Loop through files to grab every file
           file_loop(files, root)

       return files, root, subdirs


files, root, subdirs = purge_files()

Я ожидал, что вывод переместит все файлы из источника в dest.Перед перемещением файлов я ожидал проверить все файлы в расположении dest, включая subdir из dest, если какой-либо из них совпадает с исходным, то они не будут перемещены в dest.Я не хочу, чтобы папки, которые находятся в источнике.Я просто хочу, чтобы все файлы были перемещены в корневой каталог.

Ответы [ 2 ]

1 голос
/ 30 мая 2019

Я вижу, что вы уже написали большую часть кода, но в настоящее время он содержит довольно много ошибок:

  • Код имеет неправильный отступ, что делает его недействительным кодом Python.
  • Некоторые операторы импорта отсутствуют (например, для shutil).
  • Вы имеете в виду неопределенные переменные (например, source).

Если я копирую и вставляю ваш код в мою среду IDE, я получаю 26 ошибок от pep8 и pylint, и после исправления ошибки отступа я получаю 49 ошибок. Это заставляет меня задуматься, действительно ли это ваш код или вы сделали ошибки копирования-вставки. В любом случае, использование IDE определенно поможет вам проверить ваш код и обнаружить ошибки раньше. Попробуйте!

Поскольку я не могу запустить ваш код, я не могу точно сказать, почему он не работает, но я могу дать вам несколько советов.

Одна вещь, которая вызывает много вопросов, это следующая строка:

dst_dir = root.replace(source, dest)

Помимо неверного отступа, переменная dst_dir нигде не используется. Так какой смысл в этом утверждении? Также обратите внимание, что это заменяет все вхождения source в root. Для тривиальных случаев это не будет проблемой, но это не очень надежно при любых обстоятельствах. Поэтому по возможности используйте операции с путями из стандартной библиотеки и старайтесь не выполнять ручные строковые операции с путями. В Python 3.4 был введен модуль Pathlib. Я рекомендую использовать это.

Использование os.walk() может быть довольно удобным в некоторых случаях, но может быть не лучшим решением для вашего варианта использования. Возможно, рекурсивное использование os.listdir() будет намного проще, особенно если каталог назначения будет плоским (то есть фиксированным каталогом без подкаталогов).

Возможная реализация (с использованием pathlib и os.listdir()) может быть следующей:

import logging
import os
import pathlib
import shutil
import time

SOURCE_DIR_PATH = pathlib.Path('C:\\Temp')
DESTINATION_DIR_PATH = pathlib.Path('D:\\archive')

CUTOFF_DAYS = 14
CUTOFF_TIME = time.time() - CUTOFF_DAYS * 24 * 3600  # two weeks


def move_file(src_file_path, dst_dir_path):
    logging.debug('Moving file %s to directory %s', src_file_path,
                  dst_dir_path)
    return  # REMOVE THIS LINE TO ACTUALLY PERFORM FILE OPERATIONS
    try:
        shutil.move(str(src_file_path), str(dst_dir_path))
    except OSError:
        logging.info('File already exists in destination directory: %s',
                     src_file_path)
        logging.warning('Deleting file %s', src_file_path)
        src_file_path.unlink()


def move_files(src_file_paths, dst_dir_path):
    for src_file_path in src_file_paths:
        if src_file_path.stat().st_ctime < CUTOFF_TIME:
            logging.info('Moving file older than %d days: %s', CUTOFF_DAYS,
                         src_file_path)
            move_file(src_file_path, dst_dir_path)
        else:
            logging.info('Not moving file less than %d days old: %s',
                         CUTOFF_DAYS, src_file_path)


def purge_files(src_dir_path, dst_dir_path):
    logging.info('Scanning directory %s', src_dir_path)
    names = os.listdir(src_dir_path)
    paths = [src_dir_path.joinpath(name) for name in names]
    file_paths = [path for path in paths if path.is_file()]
    dir_paths = [path for path in paths if path.is_dir()]
    # Cleanup files
    move_files(file_paths, dst_dir_path)
    # Cleanup directories, recursively.
    for dir_path in dir_paths:
        purge_files(dir_path, dst_dir_path)


def main():
    logging.basicConfig(format='%(message)s', level=logging.DEBUG)
    purge_files(SOURCE_DIR_PATH, DESTINATION_DIR_PATH)


if __name__ == '__main__':
    main()

Я тестировал этот код, и он работал.

Обратите внимание, что я использовал ту же обработку ошибок для move_file, что и в вашем примере. Тем не менее, я не думаю, что это достаточно надежно. Что, если в исходном каталоге есть два файла с одинаковым именем (в разных подкаталогах или в разное время)? Затем второй файл будет удален без резервного копирования. Кроме того, в случае других ошибок (таких как «переполнение диска» или «ошибка сети») код просто предполагает, что файл уже был заархивирован, а оригинал удален. Я не знаю ваш вариант использования, но я бы серьезно подумал переписать эту функцию.

Однако я надеюсь, что эти предложения и пример кода приведут вас на правильный путь.

0 голосов
/ 29 мая 2019

Возможно, вы захотите очистить свой код, он полон ошибок.Например, 'purge_files' вместо 'purge_files ()' в main, ошибки отступа внутри purge_files и т. Д. Также, казалось бы, случайные переводы строк между кодом делают его немного неудобным для чтения (по крайней мере, для меня):)

EDITЯ быстро просмотрел ваш код и изменил несколько вещей.В основном имена переменных.Я заметил, что у вас есть несколько переменных с неописуемыми именами ('i', 't' и т. Д.) Вместе с комментарием, описывающим, что означает эта переменная.Если вы просто измените имя переменной на что-то более описательное, вам не нужны комментарии, и ваш код станет еще проще.Обратите внимание, что я не тестировал этот код, и, вероятно, он даже не запускается (поскольку это не было моей целью, а скорее показать некоторые изменения стиля, которые я бы предложил):)

import os 
import shutil
import time
import errno
import time
import sys
import logging
import logging.config


# NOTE: It is a convention to write constants in all caps
SOURCE = r'C:\Users\Desktop\BetaSource'
DEST = r'C:\Users\Desktop\BetaDest'
#Gets the current time from the time module
now = time.time()
#Timer of when to purge files
cutoff = now - (14 * 86400)
all_sources = []
all_dest_dirty = []
logging.basicConfig(level = logging.INFO,
                    filename = time.strftime("main-%Y-%m-%d.log"))


def main():
    # NOTE: Why is this function called / does it exist? It only sets a global
    # 'dest_files' which is never used...
    dest_files()
    purge_files()


# I used the dess_files function to get all of the destination files
def dest_files():
    for root, subdirs, files in os.walk(DEST):
        for file in files:
            # NOTE: Is it really necessary to use a global here?
            global all_dirty
            all_dirty.append(files)


def purge_files():
    logging.info('invoke purge_files method')
    # I removed all duplicates from dest because cleaning up duplicates in
    # dest is out of the scope
    # NOTE: This is the perfect usecase for a set
    all_dest_clean = set(all_dest_dirty)
    # os.walk used to get all files in the source location 
    for source_root, source_subdirs, source_files in os.walk(SOURCE):
        # looped through every file in source_files
        for file in source_files:
            # appending all_sources to get the application name from the
            # file path
            all_sources.append(os.path.abspath(file).split('\\')[-1]) 
            # looping through each element of all_source
            for source in all_sources:
                # logical check to see if file in the source folder exists
                # in the destination folder
                if source not in all_dest_clean:
                    # src is used to get the path of the source file this
                    # will be needed to move the file in shutil.move
                    src =  os.path.abspath(os.path.join(source_root, source))
                    # the two variables used below are to get the creation
                    # time of the files
                    metadata = os.stat(src)
                    creation_time = metadata.st_ctime
                    # logical check to see if the file is older than the cutoff
                    if creation_time < cutoff:
                        logging.info(f'File has been succesfully moved: {source}')
                        print(f'File has been succesfully moved: {source}')
                        shutil.move(src,dest)
                        # removing the already checked source files for the
                        # list this is also used in other spots within the loop
                        all_sources.remove(source)
                    else:
                        logging.info(f'File is not older than 14 days: {source}')
                        print(f'File is not older than 14 days: {source}')
                        all_sources.remove(source)
                else:
                    all_sources.remove(source)
                    logging.info(f'File: {source} already exists in the destination')
                    print(f'File: {source} already exists in the destination')


if __name__ == '__main__':
    main()
...