Python ThreadPoolExecutor с непрерывным неограниченным вводом - PullRequest
0 голосов
/ 06 октября 2019

У меня есть папка на сервере, которая будет непрерывно получать некоторые файлы в течение дня. Мне нужно посмотреть каталог, и как только файл получен, нужно начать некоторую обработку этого файла. Иногда обработка может занять немного больше времени, в зависимости от размера файла, который может достигать 20 ГБ.

Я использую concurrent.futures.ThreadPoolExecutor для обработки нескольких файлов на ходу. Но мне нужна помощь в понимании того, как обрабатывать следующий сценарий: -

Я получил 5 файлов (4 маленьких и 1 огромный файл) одновременно, ThreadPoolExecutor забирает все 5 файлов для обработки. Обработка 4 маленьких файлов занимает несколько секунд, но обработка большого файла занимает 20 минут. Теперь у меня есть еще 10 файлов, ожидающих в папке, пока обрабатывается большой файл.

Я установил max_workers = 5, но теперь запускается только один рабочий ThreadPoolExecutor для обработки большого файла, который блокирует выполнение следующего набора файлов. Как мы можем начать обрабатывать другие файлы, пока 4 рабочих свободны в это время.


import os
import time
import random
import concurrent.futures
import datetime
import functools

def process_file(file1, input_num):
    # Do some processing
    os.remove(os.path.join('C:\\temp\\abcd',file1))
    time.sleep(10)    

def main():
    print("Start Time is ",datetime.datetime.now())

    #It will be a continuous loop which will watch a directory for incoming file
    while True:
        #Get the list of files in directory
        file_list = os.listdir('C:\\temp\\abcd')
        print("file_list is", file_list)
        input_num = random.randint(1000000000,9999999999)

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            process_file_arg = functools.partial(process_file, input_num = input_num)
            executor.map(process_file_arg, file_list)

        time.sleep(10)

if __name__ == '__main__':
    main()

функция main () постоянно просматривает каталог и вызывает ThreadPoolExecutor

...