У меня есть папка на сервере, которая будет непрерывно получать некоторые файлы в течение дня. Мне нужно посмотреть каталог, и как только файл получен, нужно начать некоторую обработку этого файла. Иногда обработка может занять немного больше времени, в зависимости от размера файла, который может достигать 20 ГБ.
Я использую concurrent.futures.ThreadPoolExecutor для обработки нескольких файлов на ходу. Но мне нужна помощь в понимании того, как обрабатывать следующий сценарий: -
Я получил 5 файлов (4 маленьких и 1 огромный файл) одновременно, ThreadPoolExecutor забирает все 5 файлов для обработки. Обработка 4 маленьких файлов занимает несколько секунд, но обработка большого файла занимает 20 минут. Теперь у меня есть еще 10 файлов, ожидающих в папке, пока обрабатывается большой файл.
Я установил max_workers = 5, но теперь запускается только один рабочий ThreadPoolExecutor для обработки большого файла, который блокирует выполнение следующего набора файлов. Как мы можем начать обрабатывать другие файлы, пока 4 рабочих свободны в это время.
import os
import time
import random
import concurrent.futures
import datetime
import functools
def process_file(file1, input_num):
# Do some processing
os.remove(os.path.join('C:\\temp\\abcd',file1))
time.sleep(10)
def main():
print("Start Time is ",datetime.datetime.now())
#It will be a continuous loop which will watch a directory for incoming file
while True:
#Get the list of files in directory
file_list = os.listdir('C:\\temp\\abcd')
print("file_list is", file_list)
input_num = random.randint(1000000000,9999999999)
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
process_file_arg = functools.partial(process_file, input_num = input_num)
executor.map(process_file_arg, file_list)
time.sleep(10)
if __name__ == '__main__':
main()
функция main () постоянно просматривает каталог и вызывает ThreadPoolExecutor