python multiprocessing.Pool с итератором не может использовать все ресурсы ЦП, если использовать список вместо огромного использования памяти - PullRequest
0 голосов
/ 07 февраля 2020

Приведенный ниже фрагмент кода может точно рассказать о моей проблеме, но я не могу предоставить исполняемый код, чтобы точно воспроизвести проблему, потому что, если размер ввода мал, тогда все будет хорошо, и для подходов iterator+pool или list+pool нет разницы.

def worker(comb_a: Dict, comb_b: Dict):
    # Process `comb_a` and `comb_b`, then generate a new dict.
    #
    # The computation doing in this method is NOT heavy,
    # for some cases it can be quite light, just check some
    # conditions, then return None directly.
    return dict(...)

def merge_combs(all_combs: Dict):
    iterators = []
    for x in all_combs.values():
        iterators.append(itertools.product(x[key_to_list_a], x[key_to_list_b]))

    # So basically the iterator will generate some combinations(tuples),
    # which can be supplied to the worker, each combination size is
    # around 1KB, and for my real case there will be 40~150 million of
    # these combinations.
    with multiprocessing.Pool(processes=cpu_count) as p:
        res = p.starmap_async(worker, itertools.chain(*iterators))
        p.close()
        p.join()
        merged_combs = res.get()
        # Proceed with the merged combinations...

Так что в принципе у меня есть две проблемы:

  1. С подходом выше pool+iterator, даже с примерно 100 миллионами комбинаций в моем регионе, который имеет 8 ядер процессор и 16 ГБ памяти, все работает отлично, я вижу все 8 рабочих процессов, весь цикл займет около 400 с; Но когда я использую тот же код для запуска на моем экземпляре GCP, на котором у меня есть 32vCPU + 120 ГБ памяти, , тогда я все еще могу видеть только 8 рабочих процессов, и около 60 +% процессор простаивает с помощью команды top и полный цикл занимает около 360 + с (я ожидаю, что он будет иметь 32 рабочих процесса, а весь цикл должен занять около 100 с +), поэтому можно наблюдать лишь небольшое улучшение ; Поэтому у меня возникают трудности с выяснением и пониманием того, почему остается много простаивающих процессоров и ресурсов памяти, но есть НЕ 32 рабочих процесса для выполнения работы быстрее!
  2. Если я преобразовать итератор в список (подход list+pool) перед вызовом многопроцессорной обработки. Тогда я НЕ смогу выполнить процедуру вообще, ни на моем локальном ноутбуке, ни на моем мощном экземпляре GCP, это будет безумно забрать всю память 120 ГБ, затем закончить не могу выделить ошибку памяти, затем cra sh, но да, в этом случае для моего мощного экземпляра GCP я могу видеть все 32 рабочих процесса, и было NO в режиме ожидания CPU ушел. Так что, если я работаю с огромным списком, как я могу обойти этот безумный случай использования памяти?

Я прочитал много постов, касающихся использования памяти в многопроцессорном пуле python с огромными данными вчера, но до сих пор не имею четкого представления о том, что является лучшим подходом для моего случая, как правильно использовать COW (копирование при записи) ОС.

  • Локальный ноутбук : macOS Catalina, 2,9 ГГц четырехъядерный процессор Intel Core i7, 16 ГБ, 2133 МГц, LPDDR3
  • Экземпляр GCP: Ubuntu 16.04LTS, 32 В ЦП, 120 ГБ памяти
  • Python: 3.6.9

Некоторые другие способы, которые я пробовал:

путь 1:

Я пытался сохранить эти два начальных списка (которые позже используются для выполнения product для генерации огромных комбинаций) как глобальные переменные, затем product индекс этих двух списков, затем передайте комбинацию индексов работнику, затем работник обратится к двум исходным глобальным переменным списков по индексу для извлечения данных, затем обработает, но все равно безумная память утилизирует

путь 2:

Я пытался использовать Itertools Recipes -> grouper , чтобы сгруппировать огромные комбинации так, чтобы каждый работник каждый раз не требовал ОДНОЙ расчески (Только (sub_comb_0, sub_comb_1)), но список ячеек (((sub_comb_0, sub_comb_1), (sub_comb_2, sub_comb_3), ...)), намерение сделать работника немного тяжелым, но неожиданно обнаружил, что это grouper вызвало безумное использование памяти, даже если оно все еще основано на итераторе, и даже с n=1:

def iterable_grouper(iterable: Iterable, n: int, fillvalue=None):
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

Окончательное обновление

На случай, если кто-то доберется до подобной ситуации: я наконец перестроил часть обработки данных в Golang, так что Python клиент отправляет данные на Golang сервер через UDS (unix доменный сокет), а затем обрабатывается там.

Я не могу полностью сделать это в Golang, потому что исходные данные извлекаются откуда-то еще через один Python SDK.

Эта первая черновая версия может обрабатывать около 10 миллионов записей за 30 секунд на моем локальном ноутбуке с 8-ядерным процессором и 16 ГБ памяти, я считаю, что это может быть улучшено, а скорость растет линейно когда я предоставляю ему больше ресурсов для моего экземпляра GCP.


С помощью pprof Go я смог быстро повысить скорость примерно до 19 миллионов записей за 30 с, что уже очень впечатляет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...