Потоки
python на самом деле не помогут вам обрабатывать параллельно, поскольку они выполняются в одном и том же «реальном потоке процессора», потоки python полезны, когда вы имеете дело с асинхронными вызовами HTTP
О программе ProcessPoolExecutor
из документов :
concurrent.futures.ProcessPoolExecutor ()
Класс ProcessPoolExecutor является подклассом Executor, который использует пул процессов для выполнения звонки асинхронно. ProcessPoolExecutor использует многопроцессорный модуль, который позволяет обойти блокировку Global Interpreter Lock, но также означает, что только отобранные объекты могут быть выполнены и возвращены.
это может помочь вам, если вам требуется высокая загрузка ЦП, Вы можете использовать:
import concurrent
def manipulate_values(k_v):
k, v = k_v
return_values = []
for i in v :
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
with concurrent.futures.ProcessPoolExecutor() as executor:
example_dict = dict(executor.map(manipulate_values, example_dict1.items()))
. Вот простой тест, использующий простой for
l oop для обработки ваших данных по сравнению с ProcessPoolExecutor
, мой сценарий предполагает, что для каждого элемента для обработки вам нужно ~ 50 мс ЦП:
вы можете увидеть реальную выгоду от ProcessPoolExecutor
, если время ЦП для обрабатываемого элемента составляет high
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
time.sleep(0.05)
return k, v
def manipulate_values2(v):
time.sleep(0.05)
return v
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2((key, value))
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 10):
size = 2**exp
yield size, {i: [i] * 10_000 for i in range(size)}
r = b.run()
r.plot()
, если вы не установите число работников для ProcessPoolExecutor число работников по умолчанию будет равно числу процессоров на вашей машине (для теста, который я использовал для c с 8 CPU).
, но в вашем случае, с данными, указанными в вашем вопросе, для обработки 1 элемента потребуется ~ 3 мкс:
%timeit manipulate_values([367, 30, 847, 482, 887, 654, 347, 504, 413, 821])
2.32 µs ± 25.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
in в этом случае тест будет смотрите:
Так что лучше использовать простое значение для l oop, если время процессора для обработки одного элемента мало.
Хороший вопрос, поднятый @ user3666197, - это случай, когда у вас огромные элементы / списки, я сравнил оба подхода, используя 1_000_000_000
случайные числа в списке:
, как вы можете видеть в этом случае, более подходит для использования ProcessPoolExecutor
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
from random import choice
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return k, return_values
def manipulate_values2(v):
return_values = []
for i in v:
new_value = i ** 2 - 13
return_values.append(new_value)
return return_values
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 5):
size = 2**exp
yield size, {i: [choice(range(1000)) for _ in range(1_000_000)] for i in range(size)}
r = b.run()
r.plot()
, поскольку для обработки одного элемента требуется ~ 209 мс:
l = [367] * 1_000_000
%timeit manipulate_values2(l)
# 209 ms ± 1.45 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Тем не менее, самым быстрым вариантом будет использование numpy .arrays с решением for
l oop:
from simple_benchmark import BenchmarkBuilder
import time
import concurrent
import numpy as np
b = BenchmarkBuilder()
def manipulate_values1(k_v):
k, v = k_v
return k, v ** 2 - 13
def manipulate_values2(v):
return v ** 2 - 13
@b.add_function()
def test_with_process_pool_executor(d):
with concurrent.futures.ProcessPoolExecutor() as executor:
return dict(executor.map(manipulate_values1, d.items()))
@b.add_function()
def test_simple_for_loop(d):
for key, value in d.items():
d[key] = manipulate_values2(value)
@b.add_arguments('Number of keys in dict')
def argument_provider():
for exp in range(2, 7):
size = 2**exp
yield size, {i: np.random.randint(0, 1000, size=1_000_000) for i in range(size)}
r = b.run()
r.plot()
ожидается, что простой for
l oop будет быстрее, поскольку для обработки одного numpy .array требуется <1ms: </p>
def manipulate_value2( input_list ):
return input_list ** 2 - 13
l = np.random.randint(0, 1000, size=1_000_000)
%timeit manipulate_values2(l)
# 951 µs ± 5.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)