Добрый день всем.Я попытаюсь объяснить мою проблему, чтобы вы могли понять меня.
В нескольких местах я обнаружил, что Scala работает быстрее, чем Python:
Кроме того, сказано, что Scala является наиболее подходящим языком программирования для запуска приложений в Apache Spark:
https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213
Однако на этом сайте другой пользователь (@Mrityunjay) задал вопрос, похожий на тот, который я предлагаю здесь:
Производительность Spark для Scala против Python
ВВ этом сообщении в ответе @ zero323 подчеркивается следующее:
- @ zero323 показывает большие различия в производительности программ, написанных на Scala, и программ, написанных на Python.
- @ zero323объясняет, как использование таких операций, как ReduceByKey, может существенно повлиять на производительность приложений Spark.
- @ zero323 заменяет операцию ReduceByKey на GroupByKey, поэтому он может повысить производительностьпрограммы, предложенной @ Mrityunjay.
В общем, объяснение ответа является исключительным, и очень похожее время выполнения достигается с помощью модификации @ zero323 между Scala и Python.
учитывая эту информацию, я поставил перед собой задачу написать простую программу, которая позволила бы мне объяснить аналогичную ситуацию, которая происходит со мной в моем приложении, подчеркнув, что мой код в Scala медленнее, чем код, написанный на Python.Для этого я избегал использования операций ReduceByKey и использовал только операции отображения.
Я постараюсь выполнить любую сверхсложную операцию, чтобы максимизировать занятость кластера (96 ядер, 48 ГБ ОЗУ) и добиться больших задержек.С этой целью код генерирует набор из 1 миллиона искусственных данных (с единственной целью вычисления времени выполнения обработки 1 миллиона данных, независимо от того, реплицируются ли они), которые содержат идентификатор идентификатора, вектор длины 10DoubleS.
Поскольку мое приложение реализовано с использованием DataFrame, я создал две программы в Scala, одну с использованием RDD, а другую с использованием DataFrame, с намерением наблюдать, если проблема заключается в использовании DataFrame.Аналогично, в Python была создана эквивалентная программа.
В общем, операция применяется к каждой записи RDD / DataFrame, результат которой помещается в дополнительное поле, в результате чего создается новый RDD / DataFrame, содержащий исходные поля иновое поле с результатом.
Это код в Scala:
import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal
object RDDvsDFMapComparison {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val parts = 96
val repl = 1000000
val rep = 60000000
val ary = (0 until 10).toArray
val m = Array.ofDim[Int](repl, ary.length)
for (i <- 0 until repl)
m(i) = ary
val t1_start = System.nanoTime()
if (args(0).toInt == 0) {
val a1 = sc.parallelize(m, parts)
val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1)).toDF("Name", "Data")
val c1 = b1.map { x =>
val name = x.getString(0)
val data = x.getSeq[Int](1).toArray
var mean = 0.0
for (i <- 0 until rep)
mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
(name, data, mean)
}.toDF("Name", "Data", "Mean")
val d1 = c1.take(5)
println(d1.deep.mkString(","))
} else {
val a1 = sc.parallelize(m, parts)
val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1))
val c1 = b1.map { x =>
val name = x._1
val data = x._2
var mean = 0.0
for (i <- 0 until rep)
mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
(name, data, mean)
}
val d1 = c1.take(5)
println(d1.deep.mkString(","))
}
val t1_end = System.nanoTime()
val t1 = t1_end - t1_start
println("Map operation elapses: " + BigDecimal(t1.toDouble / 1000000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble.toString + " seconds.")
}
}
Это код в Python (намного проще):
#!/usr/bin/python
# -*- coding: latin-1 -*-
import sys
import time
import math
from pyspark import SparkContext, SparkConf
def myop(key, value):
s = 0.0
for j in range(r):
s += math.exp(math.log(sum(value)) / math.log(float(len(value))))
return (key, value, s)
if __name__ == "__main__":
conf = SparkConf().setAppName("rddvsdfmapcomparison")
sc = SparkContext(conf=conf)
parts = 96
repl = 1000000
r = 60000000
ary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
m = []
for i in range(repl): m.append(ary)
start = time.time()
a2 = sc.parallelize(m, parts)
b2 = a2.zipWithIndex().map(lambda (value, key): (key, value))
c2 = b2.map(lambda (key, value): myop(key, value))
c2.count
d2 = c2.take(5)
print '[%s]' % ', '.join(map(str, d2))
end = time.time()
print 'Elapsed time is', round(end - start, 2), 'seconds'
sc.stop()
Результатыочень ясно.Программа, реализованная на Python, работает быстрее, чем любая другая, реализованная в Scala, с использованием RDD или DataFrame.Можно также заметить, что программа в RDD немного быстрее, чем программа в DataFrame, что согласованно из-за использования декодеров, извлекающих тип данных каждого поля записи DataFrame.
Вопрос в том,, Что я делаю неправильно?Разве Scala-код не быстрее, чем Python?Может ли кто-нибудь объяснить мне, что я делаю неправильно в своем коде?Ответ от @ zero323 очень хороший и иллюстративный, но я не могу понять, как такой простой код в Scala может работать медленнее, чем в Python.
Большое спасибо за то, что нашли время прочитать мой вопрос.