Как распараллелить вычисления по словарю списков «большие данные»? - PullRequest
1 голос
/ 09 марта 2020

У меня есть вопрос, касающийся выполнения вычислений для словаря python ---- в этом случае словарь содержит миллионы ключей, и списки также длинные. Кажется, есть разногласие, можно ли здесь использовать распараллеливание, поэтому я задам вопрос здесь более подробно. Вот оригинальный вопрос:

Оптимизация парсинга массивного python словаря, многопоточность

Это игрушечный (маленький) python словарь:

example_dict1 = {'key1':[367, 30, 847, 482, 887, 654, 347, 504, 413, 821],
    'key2':[754, 915, 622, 149, 279, 192, 312, 203, 742, 846], 
    'key3':[586, 521, 470, 476, 693, 426, 746, 733, 528, 565]}

Допустим, мне нужно проанализировать значения списков, которые я реализовал в следующую простую (игрушечную) функцию:

def manipulate_values(input_list):
    return_values = []
    for i in input_list:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

Теперь я могу легко анализировать значения этого словаря следующим образом:

for key, value in example_dict1.items():
    example_dict1[key] = manipulate_values(value)

, что приводит к следующему:

example_dict1 = {'key1': [134676, 887, 717396, 232311, 786756, 427703, 120396, 254003, 170556, 674028], 
     'key2': [568503, 837212, 386871, 22188, 77828, 36851, 97331, 41196, 550551, 715703], 
     'key3': [343383, 271428, 220887, 226563, 480236, 181463, 556503, 537276, 278771, 319212]}

Вопрос: Почему я не мог использовать несколько потоков для этого вычисления, например, три потока, один для key1, key2 и key3? Будет ли concurrent.futures.ProcessPoolExecutor() работать здесь?

Оригинальный вопрос: есть ли лучшие способы оптимизировать этот дубль, чтобы он был быстрым?

Ответы [ 2 ]

2 голосов
/ 09 марта 2020
Потоки

python на самом деле не помогут вам обрабатывать параллельно, поскольку они выполняются в одном и том же «реальном потоке процессора», потоки python полезны, когда вы имеете дело с асинхронными вызовами HTTP

О программе ProcessPoolExecutor из документов :

concurrent.futures.ProcessPoolExecutor ()

Класс ProcessPoolExecutor является подклассом Executor, который использует пул процессов для выполнения звонки асинхронно. ProcessPoolExecutor использует многопроцессорный модуль, который позволяет обойти блокировку Global Interpreter Lock, но также означает, что только отобранные объекты могут быть выполнены и возвращены.

это может помочь вам, если вам требуется высокая загрузка ЦП, Вы можете использовать:

import concurrent


def manipulate_values(k_v):
    k, v = k_v
    return_values = []
    for i in v :
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return k, return_values


with concurrent.futures.ProcessPoolExecutor() as executor:
        example_dict = dict(executor.map(manipulate_values, example_dict1.items()))

. Вот простой тест, использующий простой for l oop для обработки ваших данных по сравнению с ProcessPoolExecutor, мой сценарий предполагает, что для каждого элемента для обработки вам нужно ~ 50 мс ЦП:

enter image description here

вы можете увидеть реальную выгоду от ProcessPoolExecutor, если время ЦП для обрабатываемого элемента составляет high

from simple_benchmark import BenchmarkBuilder
import time
import concurrent

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    time.sleep(0.05)
    return k, v

def manipulate_values2(v):
    time.sleep(0.05)
    return v

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2((key, value))


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 10):
        size = 2**exp
        yield size, {i: [i] * 10_000 for i in range(size)}

r = b.run()
r.plot()

, если вы не установите число работников для ProcessPoolExecutor число работников по умолчанию будет равно числу процессоров на вашей машине (для теста, который я использовал для c с 8 CPU).


, но в вашем случае, с данными, указанными в вашем вопросе, для обработки 1 элемента потребуется ~ 3 мкс:

%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

in в этом случае тест будет смотрите: enter image description here

Так что лучше использовать простое значение для l oop, если время процессора для обработки одного элемента мало.


Хороший вопрос, поднятый @ user3666197, - это случай, когда у вас огромные элементы / списки, я сравнил оба подхода, используя 1_000_000_000 случайные числа в списке:

enter image description here

, как вы можете видеть в этом случае, более подходит для использования ProcessPoolExecutor

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)

    return k, return_values

def manipulate_values2(v):
    return_values = []
    for i in v:
        new_value = i ** 2 - 13
        return_values.append(new_value)
    return return_values

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 5):
        size = 2**exp
        yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}

r = b.run()
r.plot()

, поскольку для обработки одного элемента требуется ~ 209 мс:

l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Тем не менее, самым быстрым вариантом будет использование numpy .arrays с решением for l oop:

enter image description here

from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np

b = BenchmarkBuilder()

def manipulate_values1(k_v):
    k, v = k_v
    return k,  v ** 2 - 13

def manipulate_values2(v):
    return v ** 2 - 13

@b.add_function()
def test_with_process_pool_executor(d):
    with concurrent.futures.ProcessPoolExecutor() as executor:
            return dict(executor.map(manipulate_values1, d.items()))

@b.add_function()       
def test_simple_for_loop(d):
    for key, value in d.items():
        d[key] = manipulate_values2(value)


@b.add_arguments('Number of keys in dict')
def argument_provider():
    for exp in range(2, 7):
        size = 2**exp
        yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}

r = b.run()
r.plot()

ожидается, что простой for l oop будет быстрее, поскольку для обработки одного numpy .array требуется <1ms: </p>

def manipulate_value2( input_list ):
    return input_list ** 2 - 13

l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
1 голос
/ 09 марта 2020

Q : " Почему не может использовать несколько потоков для этого вычисления, например, три потока, один для ключа1, ключа2 и ключа3? "

Вы могли бы, но без какого-либо разумного влияния на производительность - знание всех деталей о том, как python обрабатывает поток выполнения на основе потоков, является здесь кардинальным. Узнайте о трюке с GIL-блокировкой , используемом прямо для него избегая любой параллельной обработки и ее влияния на производительность вы получите WHY - part.

Q : " Будет ли concurrent.futures.ProcessPoolExecutor() работать здесь?"

Будет ли .

Однако его net -эффект (если он «быстрее» , чем чистый [SERIAL] поток обработки) будет зависеть от заданного размера "больших" -списков (как предупреждено: (cit.) "миллионы ключей, и списки одинаково длинные." выше), что должен быть скопирован (RAM-I / O) и передан (SER / DES-обработано + IP C -передан) в пул порожденных (основанных на процессах) удаленных исполнителей.

Эти многократно повторяющиеся ОЗУ Вскоре будут преобладать накладные расходы дополнительных модулей I / O + SER / DES.

Шаг копирования ОЗУ:

>>> from zmq import Stopwatch; aClk = Stopwatch()

>>> aClk.start(); aList = [ i for i in range( int( 1E4 ) ) ]; aClk.stop()
   1345 [us] to copy a List of 1E4 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E5 ) ) ]; aClk.stop()
  12776 [us] to copy a List of 1E5 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E6 ) ) ]; aClk.stop()
 149197 [us] to copy a List of 1E6 elements
>>> aClk.start(); aList = [ i for i in range( int( 1E7 ) ) ]; aClk.stop()
1253792 [us] to copy a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

Шаг SER / DES:

>>> import pickle
>>> aClk.start(); _ = pickle.dumps( aList ); aClk.stop()
 608323 
 615851
 638821 [us] to copy pickle.dumps() a List of 1E7 elements
|  |::: [us]
|  +--- [ms]
+------ [ s]

Таким образом, ожидаемые накладные расходы для каждой партии составляют ~ 2 x ( 1253 + 608 ) [ms] + IP C -передача стоит всего один выстрел из 1E7-предметов

Фактическая полезная нагрузка manipulate_values() составляет настолько мала, что единовременная сумма всех дополнительных затрат вряд ли покроет дополнительные расходы, связанные с распределением рабочих единиц по пулу удаленных работников. Гораздо более разумных результатов следует ожидать от векторизованных форм вычислений. Стоимость надстройки здесь намного больше, чем небольшой объем полезной работы.

Чем больше схема будет зависеть от накладных расходов, связанных с передачей параметров SER / DES "туда" плюс дополнительные расходы SER / DES на возвращаемые результаты «назад» - все это в совокупности будет зависеть от net -эффекта ( anti -speedups << 1.0 x довольно часто наблюдаются в сценариях использования, представленных с плохой инженерной практикой на стороне дизайна, никакие поздние контрольные показатели не могут спасти уже сожженных человеко-дней, потраченных на такое плохое проектное решение)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...