PySpark с NumPy: нестабильность производительности - PullRequest
0 голосов
/ 05 июня 2018

Я хотел бы измерить производительность PySpark при моделировании Pi на моем локальном рабочем столе в Windows

Я использую Python 3.5

Я заметил, что если я использую numpy внутри функциивремя запуска spark worker среднее время работы этой функции зависит от количества ядер, в то время как оно постоянно без числа

Я использую jupyter для запуска теста

Сначала я импортирую свои пакеты

import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
import random
import pandas as pd
from IPython.display import display, HTML
import time
import pandas as pd
import numpy as np
from itertools import product
import matplotlib.pyplot as plt

определение моей функции numpy, которая оценивает Pi

def pi_eval_np(nb_evaluation):
   start = time.time()
   b = 1000000
   np.random.seed(1)
   r = np.random.rand(b, 2)
   x = r[:,0]
   y = r[:,1]
   idx = np.sqrt(x**2 + y**2) < 1
   pi = (np.sum(idx).astype('double')/b*4)
   stop = time.time()
   return pi, stop - start

Общая функция для запуска spark

def spark_evaluator(nb_cores, size_range, lambda_function):

    dict_output = {}
    dict_output["number of cores"] = nb_cores
    dict_output["size of range"] = size_range
    # Spark configuration
    conf = (SparkConf()
         .setMaster("local[" + str(nb_cores) + "]")
         .setAppName("spark eval")
         .set("spark.executor.memory", "4g"))

    # Generate spark Context
    start = time.time()
    sc = SparkContext(conf = conf)
    stop = time.time()

    dict_output["spark Context"] = stop - start

    #load array onto Spark workers
    start = time.time()
    chunks = sc.parallelize(range(0, size_range))
    stop = time.time()

    dict_output["spark parallelize"] = stop - start

    #map evaluation
    #Warm-up
    output = chunks.map(lambda_function).collect()

    start = time.time()
    output = chunks.map(lambda_function).collect()
    stop = time.time()

    dict_output["total evaluation"] = stop - start
    dict_output["mean lambda function"] = np.mean([x[1] for x in output])

    # stop spark Context
    sc.stop()


    return dict_output

Затем я могу запустить свой оценщик с несколькими видами конфигураций:

#Scenario 
cores_scenarii = [1, 2, 4, 8]
range_scenarii = [1000]

#run all scenarii
result_pi_np = []
for core, size_range in product(cores_scenarii, range_scenarii):
    result_pi_np.append(spark_evaluator(nb_cores = core, size_range = size_range, lambda_function = pi_eval_np))

#transform list of dict to dict of list
output_pi_scenarii_np = pd.DataFrame(result_pi_np).to_dict('list')

#output in pandas
display(pd.DataFrame(output_pi_scenarii_np))

Вывод:

<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th></th>
      <th>mean lambda function</th>
      <th>number of cores</th>
      <th>size of range</th>
      <th>spark Context</th>
      <th>spark parallelize</th>
      <th>total evaluation</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <th>0</th>
      <td>0.067703</td>
      <td>1</td>
      <td>1000</td>
      <td>0.116012</td>
      <td>0.005000</td>
      <td>70.474194</td>
    </tr>
    <tr>
      <th>1</th>
      <td>0.063604</td>
      <td>2</td>
      <td>1000</td>
      <td>0.135212</td>
      <td>0.015601</td>
      <td>34.119039</td>
    </tr>
    <tr>
      <th>2</th>
      <td>0.065864</td>
      <td>4</td>
      <td>1000</td>
      <td>0.143411</td>
      <td>0.001000</td>
      <td>20.587668</td>
    </tr>
    <tr>
      <th>3</th>
      <td>0.081089</td>
      <td>8</td>
      <td>1000</td>
      <td>0.134608</td>
      <td>0.005001</td>
      <td>18.336296</td>
    </tr>
  </tbody>
</table>

Как вы можете заметить, средняя лямбда-функция не постоянна, когда я увеличиваю количество ядер (хорошо с 1,2,4 ядрами, проблема с 8 ядрами) (у меня естьтаймер в функции pi для вычисления среднего времени процесса в конце цикла)

Принимая во внимание, что если я использую стандартную функцию без numpy, время будет постоянным, например:

def pi_eval(void):
   start = time.time()
   b = 10000
   in_circle = 0
   for i in range(0, b):
      x, y = random.random(), random.random()
      r = (x*x + y*y) < 1
      in_circle += 1 if r else 0

   pi = float(in_circle)/float(b)*4
   stop = time.time()
   return pi, stop - start

Есть ли у вас идеи, почему numpy вводит такие накладные расходы?Как вы думаете, это может быть связано с тем, что нам нужно загрузить C dll процессом?если да, то почему?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...