Я бы хотел сравнить производительность UDF по одному с UDF Pandas. Вот мой код:
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SQLContext
sc.install_pypi_package("scipy")
sc.install_pypi_package("pandas")
sc.install_pypi_package("PyArrow==0.14.1")
import scipy as sp
import numpy as np
import pandas as pd
import itertools
import time
from scipy.stats import beta
def expand_grid(data_dict):
rows = itertools.product(*data_dict.values())
return pd.DataFrame.from_records(rows, columns=data_dict.keys())
np.random.seed(123)
gd = int(1e6)
grid = pd.DataFrame(data={'q': np.random.random(gd),
'a': np.random.random(gd),
'b': np.random.random(gd)})
#
# create spark data frame
#
grid_spark = spark.createDataFrame(grid)
#
# one-at-a-time
#
def qbeta(q,a,b):
return beta.ppf(q,a,b).tolist()
qbeta_spark = F.udf(qbeta, DoubleType())
#
# TIME one-at-a-time
#
st = time.time()
x = grid_spark.select(qbeta_spark("q","a","b")).rdd.count()
time.time() - st
#
# Pandas
#
@F.pandas_udf("double", F.PandasUDFType.SCALAR)
def qbeta_scalar(q,a,b):
return pd.Series(beta.ppf(q,a,b))
#
# TIME Pandas
#
st = time.time()
x = grid_spark.select(qbeta_scalar(grid_spark.quantile, grid_spark.mdr, grid_spark.sdr)).rdd.count()
time.time() - st
Мне известно о ленивой оценке в Spark, и что нужно запустить оценку фрейма данных. Есть ли что-то, что я делаю не так? Я заметил, что, если я запускаю скрипт во второй раз сразу после первой оценки, время будет намного ниже. Похоже, за сценой происходит какое-то кеширование. Как правильно тестировать PySpark UDF?