Как правильно тестировать PySpark UDF - PullRequest
0 голосов
/ 22 октября 2019

Я бы хотел сравнить производительность UDF по одному с UDF Pandas. Вот мой код:

from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SQLContext

sc.install_pypi_package("scipy")
sc.install_pypi_package("pandas")
sc.install_pypi_package("PyArrow==0.14.1")

import scipy as sp
import numpy as np
import pandas as pd
import itertools
import time
from scipy.stats import beta

def expand_grid(data_dict):
    rows = itertools.product(*data_dict.values())
    return pd.DataFrame.from_records(rows, columns=data_dict.keys())

np.random.seed(123)

gd = int(1e6)
grid = pd.DataFrame(data={'q': np.random.random(gd),
                          'a': np.random.random(gd),
                          'b': np.random.random(gd)})

#
# create spark data frame
#
grid_spark = spark.createDataFrame(grid)

#
# one-at-a-time
#
def qbeta(q,a,b):
    return beta.ppf(q,a,b).tolist()
qbeta_spark = F.udf(qbeta, DoubleType())

#
#  TIME one-at-a-time 
#
st = time.time()
x = grid_spark.select(qbeta_spark("q","a","b")).rdd.count()
time.time() - st

#
# Pandas
#
@F.pandas_udf("double", F.PandasUDFType.SCALAR)
def qbeta_scalar(q,a,b):
    return pd.Series(beta.ppf(q,a,b))

#
# TIME Pandas 
#
st = time.time() 
x = grid_spark.select(qbeta_scalar(grid_spark.quantile, grid_spark.mdr, grid_spark.sdr)).rdd.count()
time.time() - st

Мне известно о ленивой оценке в Spark, и что нужно запустить оценку фрейма данных. Есть ли что-то, что я делаю не так? Я заметил, что, если я запускаю скрипт во второй раз сразу после первой оценки, время будет намного ниже. Похоже, за сценой происходит какое-то кеширование. Как правильно тестировать PySpark UDF?

...