Распараллелить понимание словаря Python - PullRequest
0 голосов
/ 09 июля 2019

Я пытаюсь распараллелить подмножество словаря Python. Приведенный ниже код создает новый словарь, positions_sub, в зависимости от того, находятся ли ключи в словаре positions в списке, node_list:

positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

Этот код работает просто отлично и делает именно то, что я хочу. Однако для запуска требуется некоторое время, поэтому я пытаюсь распараллелить его. Я пытался сделать это в приведенном ниже коде, но он возвращает positions_sub в виде списка словарей, а это не то, что я хочу. Есть также некоторые проблемы с количеством значений на ключ. Есть идеи, как заставить это работать? Спасибо!

from joblib import Parallel, delayed

def dict_filter(k,v):
    if k in node_list:
        positions_sub[k] = v
    return positions_sub
positions_sub = Parallel(n_jobs=-1,)(delayed(dict_filter)(k,v)for k,v in positions.items())

1 Ответ

1 голос
/ 09 июля 2019

Прежде чем прибегнуть к распараллеливанию, убедитесь, что вы используете правильную структуру данных для каждой задачи: помните, что x in list по сути O(n), тогда как x in set (а также x in dict) больше похоже на O(1) , Поэтому простое преобразование node_list в set может значительно повысить производительность.

node_list = set(node_list)
positions_sub = {}
for k,v in positions.items():
    if k in node_list:
        positions_sub[k] = v

Еще одна вещь, которую следует учитывать, это соотношение между len(positions) и len(node_list). Если один из них значительно меньше другого, вы всегда должны перебирать меньший.


РЕДАКТИРОВАТЬ: некоторый код для сравнения производительности

import random
import timeit
import functools

def generate(n_positions=1000, n_node_list=100):
    positions = { i:i for i in random.sample(range(n_positions), n_positions) }
    node_list = random.sample(range(max(n_positions, n_node_list)), n_node_list)
    return positions, node_list  

def validate(variant):
    data = generate(1000, 100)
    if sorted(data[1]) != sorted(k for k in variant(*data)):
        raise Exception(f"{variant.__name__} failed")

def measure(variant, data, repeats=1000):
    total_seconds = timeit.Timer(functools.partial(variant, *data)).timeit(repeats)
    average_ms = total_seconds / repeats * 1000
    print(f"{variant.__name__:10s} took an average of {average_ms:0.2f}ms per pass over {repeats} passes" )   


def variant1(positions, node_list):
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant1b(positions, node_list):
    node_list = set(node_list)
    positions_sub = {}
    for k,v in positions.items():
        if k in node_list:
            positions_sub[k] = v
    return positions_sub

def variant2(positions, node_list):
    return {k:v for k,v in positions.items() if k in node_list}

def variant2b(positions, node_list):
    node_list = set(node_list)
    return {k:v for k,v in positions.items() if k in node_list}

def variant3(positions, node_list):
    return {k:positions[k] for k in node_list if k in positions}



if __name__ == "__main__":
    variants = [variant1,variant1b,variant2,variant2b,variant3]
    for variant in variants:
        validate(variant)      

    n_positions = 4000
    n_node_list = 1000
    n_repeats = 100
    data = generate(n_node_list, n_node_list)
    print(f"data generated with len(positions)={n_positions} and len(node_list)={n_node_list}")
    for variant in variants:
        measure(variant, data, n_repeats)

EDIT2: в соответствии с запросом, вот некоторые результаты на моей машине

first run:
data generated with len(positions)=4000 and len(node_list)=1000
variant1   took an average of 6.90ms per pass over 100 passes
variant1b  took an average of 0.22ms per pass over 100 passes
variant2   took an average of 6.95ms per pass over 100 passes
variant2b  took an average of 0.12ms per pass over 100 passes
variant3   took an average of 0.19ms per pass over 100 passes

second run:
data generated with len(positions)=40000 and len(node_list)=10000
variant1   took an average of 738.23ms per pass over 10 passes
variant1b  took an average of   2.04ms per pass over 10 passes
variant2   took an average of 739.51ms per pass over 10 passes
variant2b  took an average of   1.52ms per pass over 10 passes
variant3   took an average of   1.85ms per pass over 10 passes

Обратите внимание, что n=len(positions) и m=len(node_list) были выбраны таким образом, что соотношение n/m=4 приблизительно эквивалентно отношению исходных данных, которые были определены OP как 1,2M для n и 300K для m .

Обратите внимание на эффект увеличения в 10 раз от первого до второго запуска: где в первом запуске вариант1b примерно в 31 раз быстрее, чем вариант1, во втором - в 361 раз быстрее! Это ожидаемый результат уменьшения сложности k in node_list с O (m) до O (1). Общая временная сложность варианта 1 равна n * m = 0,25 * n ^ 2 = O (n ^ 2), тогда как вариант1b имеет только n * 1 = O (n). Это означает, что для каждого порядка величины n, который увеличивается n, вариант 1b также на порядок быстрее, чем вариант 1.

То, что аналогичное улучшение производительности может быть достигнуто только за счет распараллеливания, весьма сомнительно, так как в целом ожидаемый выигрыш в производительности от смущающей распараллеливаемой проблемы является кратным числа доступных ЦП, который по-прежнему является постоянным фактором и далеко не равен усилению улучшения алгоритма с O (n ^ 2) до O (n).

Кроме того, хотя ИМХО данная проблема относится к классу смущающих параллелизуемых задач, выходные данные должны быть агрегированы после параллельной обработки, прежде чем ее можно будет использовать. Кроме того, я совершенно незнаком с joblib, поэтому я пропустил добавление его в сравнение.

...