Архивирование файлов в Twistd без блокировки. - PullRequest
0 голосов
/ 28 мая 2018

Есть ли способ архивировать файлы без блокировки в витой?

import zipfile
from twisted.internet import defer
from twisted.internet import reactor

def zip_files(file_list, path, output_Zip):
    zip_handle = zipfile.ZipFile(output_zip,  mode='w', allowZip64=True)
    try:
        for i in file_list:
            zip_handle.write(i)
        zip_handle.close()
        return True
    except Exception as e:
        return False
def print_zip(res):
    print res
    return res


file_list = ['path_to_file1','path_to_file2']
output_path = 'full_path_to_output_zip'
d = defer.Deferred()
d.addCallback(lambda _: zip_files(file_list, output_path)
d.addCallback(print_zip)
zip_result = d
reactor.run()

У меня это пока что.Хотя это работает, запуск процесса архивирования приводит к тому, что витая блокируется и ждет, пока начальное «задание на сжатие» не будет завершено.Я бы предпочел прекратить существующее «почтовое задание» и начать новое.

Ответы [ 2 ]

0 голосов
/ 28 мая 2018
import zipfile
from twisted.internet import defer, reactor

def main():
    file_list = ['path_to_file1','path_to_file2']
    output_path = 'full_path_to_output.zip'
    zip_obj = zipfile.ZipFile(output_path, mode='w', allowZip64=True)

    d = zip_files(zip_obj, file_list)
    d.addCallback(handle_success)
    d.addErrback(handle_error)
    d.addBoth(close_zip_obj, zip_obj = zip_obj)

@defer.inlineCallbacks
def zip_files(zip_obj, file_list):
    for item in file_list:
        yield zip_obj.write(item)
        # handle "interrupts" here

def handle_success(ignore):
    print('Done zipping')

def handle_error(failure):
    print('Error: {0}'.format(failure.value))

def close_zip_obj(ignore, zip_obj):
    print('Closing zip object')
    zip_obj.close()

main()
reactor.run()

Я старался, чтобы мой пример был простым, чтобы новички в Twisted не запутались.Объект ZipFile создается снаружи и передается в zip_files() (который теперь отмечен @inlineCallbacks и возвращает 'Deferred'), таким образом, к нему можно легко получить доступ при необходимости.Обработанные вызовы и вызовы ошибок (через addCallback/addErrback) обновляют эти функции в соответствии с вашими потребностями.Наконец, объект ZipFile, который передается в close_zip_obj() из главной функции, закрывается после завершения архивирования.Это должно обрабатывать множество файлов среднего размера довольно быстро.Для больших файлов вы «должны» быть в порядке, выполняя задачу в deferToThread с использованием вашего исходного кода.

Однако вы сделали очень размытый комментарий:

Я бы предпочел прекратить существующее «задание zip» и запустить новое.

Предполагается, что если вы находитесь в середине zip, вы хотите остановить текущий zipи начать еще один.С deferToThread или любым многопоточным подходом становится утомительным передавать флаги между потоками, устанавливать / снимать блокировки и синхронизироваться с другими потоками.Просто имейте это в виду, если решите использовать нити.

0 голосов
/ 28 мая 2018

Возможно, что-то вроде этого, используя DeferredList из deferToThread с, чтобы не блокировать запись zip-файлов:

import zipfile
import logging
from twisted.internet import threads, defer
from twisted.internet import reactor

log = logging.getLogger()
log.addHandler(logging.StreamHandler())
log.setLevel(logging.INFO)


def zip_file(input_path, output_path):
    with zipfile.ZipFile(output_path,  mode='w', allowZip64=True) as zip_handle:
        zip_handle.write(input_path)


def log_failure(err):
    log.exception("error: %s", err)


def zip_file_and_catch_error(input_path, output_path):
    d = threads.deferToThread(zip_file, input_path, output_path)
    d.addErrback(log_failure)
    return d


def main():
    input_paths = ['path_to_file1','path_to_file2']
    output_paths = ['path_out1','path_out2']
    assert len(input_paths) == len(output_paths)
    dl = defer.DeferredList([zip_file_and_catch_error(input_path, output_path) 
                             for input_path, output_path in zip(input_paths, output_paths)])
    dl.addCallback(lambda result: log.info("result: %s", result))
    dl.addBoth(lambda _: reactor.callLater(0, reactor.stop))
    reactor.run()


if __name__ == "__main__":
    main()
...