Я конвертирую ниже функции pyspark в Spark-Scala. В функции pyspark мы передаем в качестве аргумента значения (plat, metrics, perc)
и получение некоторого приблизительного значения для ввода.
Входные данные одинаковы как для кода, так и для Pyspark и scala.
код pyspark:
def give_percentile(plat,metrics,perc):
df_perc = df_final_main.filter(df_final_main.platform.like('%' + plat + '%'))
df_perc = df_perc.filter(df_final_main[metrics]!=0)
percentile_val = df_perc.approxQuantile(metrics, [perc],0.05)
if len(percentile_val)>0:
percentile_val = float(percentile_val[0])
else:
percentile_val = float(0)
return percentile_val
Метод вызова:
df_agg = sqlContext.createDataFrame(
[Row(platform='iOS',percentile_page_load_50=give_percentile("iOS","page_load",0.5)),
Row(platform='Android',percentile_page_load_50=give_percentile("Android","page_load",0.5)),
Row(platform='Web',percentile_page_load_50=give_percentile("Web","page_load",0.5))
]
)
Выходные значения как:
+-----------------------+--------+
|percentile_page_load_50|platform|
+-----------------------+--------+
| 0.0| iOS|
| 0.0| Android|
| 6.956999778747559|| Web|
+-----------------------+--------+
Код Spark-scala:
def give_percentile(plat: String, metrics:String, perc: Double): Double = {
var df_perc = df_final_main_join.filter(df_final_main_join.col("platform").like('%' + plat + '%'))
var df_perc1 = df_perc.filter(df_perc.col(metrics) =!= 0)
var apQuantile = df_perc.stat.approxQuantile(b, Array(c), 0.05)
if (apQuantile.length > 0){
var ret = apQuantile(0)
return ret
}
else {
var ret :Double=(0)
return ret
}
}
Метод вызова:
val schema = StructType(
Seq(
StructField("platform", StringType, true),
StructField("percentile_page_load_50", DoubleType, true),
StructField("percentile_page_load_80", DoubleType, true)
//StructField("percentile_page_load_90"), DoubleType, true)
)
)
val row1 = Seq(Row("Web",give_percentile("Web", "page_load", 0.5)),
Row("iSO",give_percentile("iSO", "page_load", 0.5)),
Row("Android",give_percentile("Android", "page_load", 0.5)))
val row_rdd = spark.sparkContext.parallelize(row1)
val rowss = sqlcontext.createDataFrame(row_rdd, schema)
Но здесь выдается другой вывод:
Выходные значения, такие как:
+-----------------------+--------+
|percentile_page_load_50|platform|
+-----------------------+--------+
| 0.0| iOS|
| 0.0| Android|
| 0.0| Web|
+-----------------------+--------+
Не могли бы вы помочь мне здесь и сказать, что не так в приведенном выше скала-коде?