Использование ThreadPoolExecutor с уменьшенным объемом памяти - PullRequest
0 голосов
/ 01 ноября 2018

Я использую ThreadPoolExecutor для загрузки огромного (~ 400k) количества изображений ключевых кадров. Имена ключевых кадров хранятся в текстовом файле (скажем, keyframes_list.txt ).

Я изменил пример, приведенный в документации , и кажется, что он работает безупречно с одним исключением: как ясно, пример передает каждую ссылку на объект future, который все передается итерируемому (dict() если быть точным). Эта итерация передается в качестве аргумента функции as_completed() для проверки завершения future. Это, конечно, требует огромного количества текста, загружаемого одновременно в память. Мой процесс Python для этой задачи занимает 1 ГБ оперативной памяти.

Полный код показан ниже:

import concurrent.futures
import requests

def download_keyframe(keyframe_name):
    url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
    r = requests.get(url, allow_redirects=True)
    open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
    return True

keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    with open(keyframes_list_path, 'r') as f:
        for i, line in enumerate(f):
            fields = line.split('\t')
            keyframe_name = fields[0]
            future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
    for future in concurrent.futures.as_completed(future_to_url):
        keyframe_name = future_to_url[future]
        try:
            future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (keyframe_name, exc))
        else:
            print('Keyframe: {} was downloaded.'.format(keyframe_name))

Итак, мой вопрос заключается в том, как я мог бы обеспечить итеративность, и при этом сохранить объем памяти. Я подумал об использовании queue, но не уверен, что он гладко взаимодействует с ThreadPoolExecutor. Есть ли простой способ контролировать количество future s, отправленных на ThreadPoolExecutor?

1 Ответ

0 голосов
/ 01 ноября 2018

Если мы посмотрим на источник для as_completed(), первое, что он делает, - оценивает любую итерацию, которую вы передаете в качестве первого аргумента, в строке 221, с fs=set(fs). Так что, пока вы читаете и ставите в очередь весь файл одновременно, as_completed() будет загружать все эти экземпляры Future в память при его вызове.

Чтобы обойти это, вам нужно разделить входные данные и вызывать только as_completed с подмножеством Futures на каждой итерации. Вы можете использовать фрагмент из этого ответа ; куски ~ 1k должны поддерживать ваш пул потоков насыщенным, не занимая при этом чрезмерной памяти. Ваш окончательный код, начиная с блока with для ThreadPoolExecutor, должен выглядеть примерно так:

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    for lines in grouper(open(keyframes_list_path, 'r'), 1000):
        # reset the dict that as_completed() will check on every iteration
        future_to_url = {}
        for i, line in enumerate(lines):
            fields = line.split('\t')
            keyframe_name = fields[0]
            future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
        for future in concurrent.futures.as_completed(future_to_url):
            keyframe_name = future_to_url[future]
            try:
                future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (keyframe_name, exc))
            else:
                print('Keyframe: {} was downloaded.'.format(keyframe_name))
...