Я хотел бы измерить производительность PySpark при моделировании Pi на моем локальном рабочем столе в Windows
Я использую Python 3.5
Я заметил, что если я использую numpy внутри функциивремя запуска spark worker среднее время работы этой функции зависит от количества ядер, в то время как оно постоянно без числа
Я использую jupyter для запуска теста
Сначала я импортирую свои пакеты
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
import random
import pandas as pd
from IPython.display import display, HTML
import time
import pandas as pd
import numpy as np
from itertools import product
import matplotlib.pyplot as plt
определение моей функции numpy, которая оценивает Pi
def pi_eval_np(nb_evaluation):
start = time.time()
b = 1000000
np.random.seed(1)
r = np.random.rand(b, 2)
x = r[:,0]
y = r[:,1]
idx = np.sqrt(x**2 + y**2) < 1
pi = (np.sum(idx).astype('double')/b*4)
stop = time.time()
return pi, stop - start
Общая функция для запуска spark
def spark_evaluator(nb_cores, size_range, lambda_function):
dict_output = {}
dict_output["number of cores"] = nb_cores
dict_output["size of range"] = size_range
# Spark configuration
conf = (SparkConf()
.setMaster("local[" + str(nb_cores) + "]")
.setAppName("spark eval")
.set("spark.executor.memory", "4g"))
# Generate spark Context
start = time.time()
sc = SparkContext(conf = conf)
stop = time.time()
dict_output["spark Context"] = stop - start
#load array onto Spark workers
start = time.time()
chunks = sc.parallelize(range(0, size_range))
stop = time.time()
dict_output["spark parallelize"] = stop - start
#map evaluation
#Warm-up
output = chunks.map(lambda_function).collect()
start = time.time()
output = chunks.map(lambda_function).collect()
stop = time.time()
dict_output["total evaluation"] = stop - start
dict_output["mean lambda function"] = np.mean([x[1] for x in output])
# stop spark Context
sc.stop()
return dict_output
Затем я могу запустить свой оценщик с несколькими видами конфигураций:
#Scenario
cores_scenarii = [1, 2, 4, 8]
range_scenarii = [1000]
#run all scenarii
result_pi_np = []
for core, size_range in product(cores_scenarii, range_scenarii):
result_pi_np.append(spark_evaluator(nb_cores = core, size_range = size_range, lambda_function = pi_eval_np))
#transform list of dict to dict of list
output_pi_scenarii_np = pd.DataFrame(result_pi_np).to_dict('list')
#output in pandas
display(pd.DataFrame(output_pi_scenarii_np))
Вывод:
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th></th>
<th>mean lambda function</th>
<th>number of cores</th>
<th>size of range</th>
<th>spark Context</th>
<th>spark parallelize</th>
<th>total evaluation</th>
</tr>
</thead>
<tbody>
<tr>
<th>0</th>
<td>0.067703</td>
<td>1</td>
<td>1000</td>
<td>0.116012</td>
<td>0.005000</td>
<td>70.474194</td>
</tr>
<tr>
<th>1</th>
<td>0.063604</td>
<td>2</td>
<td>1000</td>
<td>0.135212</td>
<td>0.015601</td>
<td>34.119039</td>
</tr>
<tr>
<th>2</th>
<td>0.065864</td>
<td>4</td>
<td>1000</td>
<td>0.143411</td>
<td>0.001000</td>
<td>20.587668</td>
</tr>
<tr>
<th>3</th>
<td>0.081089</td>
<td>8</td>
<td>1000</td>
<td>0.134608</td>
<td>0.005001</td>
<td>18.336296</td>
</tr>
</tbody>
</table>
Как вы можете заметить, средняя лямбда-функция не постоянна, когда я увеличиваю количество ядер (хорошо с 1,2,4 ядрами, проблема с 8 ядрами) (у меня естьтаймер в функции pi для вычисления среднего времени процесса в конце цикла)
Принимая во внимание, что если я использую стандартную функцию без numpy, время будет постоянным, например:
def pi_eval(void):
start = time.time()
b = 10000
in_circle = 0
for i in range(0, b):
x, y = random.random(), random.random()
r = (x*x + y*y) < 1
in_circle += 1 if r else 0
pi = float(in_circle)/float(b)*4
stop = time.time()
return pi, stop - start
Есть ли у вас идеи, почему numpy вводит такие накладные расходы?Как вы думаете, это может быть связано с тем, что нам нужно загрузить C dll процессом?если да, то почему?