Теоретически, Scala быстрее, чем Python для Apache Spark.На практике это не так.В чем дело? - PullRequest
0 голосов
/ 09 октября 2018

Добрый день всем.Я попытаюсь объяснить мою проблему, чтобы вы могли понять меня.

В нескольких местах я обнаружил, что Scala работает быстрее, чем Python:

Кроме того, сказано, что Scala является наиболее подходящим языком программирования для запуска приложений в Apache Spark:

https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213

Однако на этом сайте другой пользователь (@Mrityunjay) задал вопрос, похожий на тот, который я предлагаю здесь:

Производительность Spark для Scala против Python

ВВ этом сообщении в ответе @ zero323 подчеркивается следующее:

  1. @ zero323 показывает большие различия в производительности программ, написанных на Scala, и программ, написанных на Python.
  2. @ zero323объясняет, как использование таких операций, как ReduceByKey, может существенно повлиять на производительность приложений Spark.
  3. @ zero323 заменяет операцию ReduceByKey на GroupByKey, поэтому он может повысить производительностьпрограммы, предложенной @ Mrityunjay.

В общем, объяснение ответа является исключительным, и очень похожее время выполнения достигается с помощью модификации @ zero323 между Scala и Python.

учитывая эту информацию, я поставил перед собой задачу написать простую программу, которая позволила бы мне объяснить аналогичную ситуацию, которая происходит со мной в моем приложении, подчеркнув, что мой код в Scala медленнее, чем код, написанный на Python.Для этого я избегал использования операций ReduceByKey и использовал только операции отображения.

Я постараюсь выполнить любую сверхсложную операцию, чтобы максимизировать занятость кластера (96 ядер, 48 ГБ ОЗУ) и добиться больших задержек.С этой целью код генерирует набор из 1 миллиона искусственных данных (с единственной целью вычисления времени выполнения обработки 1 миллиона данных, независимо от того, реплицируются ли они), которые содержат идентификатор идентификатора, вектор длины 10DoubleS.

Поскольку мое приложение реализовано с использованием DataFrame, я создал две программы в Scala, одну с использованием RDD, а другую с использованием DataFrame, с намерением наблюдать, если проблема заключается в использовании DataFrame.Аналогично, в Python была создана эквивалентная программа.

В общем, операция применяется к каждой записи RDD / DataFrame, результат которой помещается в дополнительное поле, в результате чего создается новый RDD / DataFrame, содержащий исходные поля иновое поле с результатом.

Это код в Scala:

import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal

object RDDvsDFMapComparison {
  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("Test").getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._

    val parts = 96
    val repl = 1000000
    val rep = 60000000

    val ary = (0 until 10).toArray
    val m = Array.ofDim[Int](repl, ary.length)
    for (i <- 0 until repl)
      m(i) = ary

    val t1_start = System.nanoTime()
    if (args(0).toInt == 0) {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1)).toDF("Name", "Data")
      val c1 = b1.map { x =>
        val name = x.getString(0)
        val data = x.getSeq[Int](1).toArray
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }.toDF("Name", "Data", "Mean")
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    } else {
      val a1 = sc.parallelize(m, parts)
      val b1 = a1.zipWithIndex().map(x => (x._2.toString, x._1))
      val c1 = b1.map { x =>
        val name = x._1
        val data = x._2
        var mean = 0.0
        for (i <- 0 until rep)
          mean += Math.exp(Math.log(data.sum) / Math.log(data.length))
        (name, data, mean)
      }
      val d1 = c1.take(5)
      println(d1.deep.mkString(","))
    }
    val t1_end = System.nanoTime()
    val t1 = t1_end - t1_start
    println("Map operation elapses: " + BigDecimal(t1.toDouble / 1000000000).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble.toString + " seconds.")
  }
}

Это код в Python (намного проще):

#!/usr/bin/python
# -*- coding: latin-1 -*-

import sys
import time
import math
from pyspark import SparkContext, SparkConf

def myop(key, value):
  s = 0.0
  for j in range(r):
    s += math.exp(math.log(sum(value)) / math.log(float(len(value))))
  return (key, value, s)

if __name__ == "__main__":
  conf = SparkConf().setAppName("rddvsdfmapcomparison")
  sc = SparkContext(conf=conf)
  parts = 96
  repl = 1000000
  r = 60000000
  ary = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  m = []
  for i in range(repl): m.append(ary)
  start = time.time()
  a2 = sc.parallelize(m, parts)
  b2 = a2.zipWithIndex().map(lambda (value, key): (key, value))
  c2 = b2.map(lambda (key, value): myop(key, value))
  c2.count
  d2 = c2.take(5)
  print '[%s]' % ', '.join(map(str, d2))
  end = time.time()
  print 'Elapsed time is', round(end - start, 2), 'seconds'
  sc.stop()

Результатыочень ясно.Программа, реализованная на Python, работает быстрее, чем любая другая, реализованная в Scala, с использованием RDD или DataFrame.Можно также заметить, что программа в RDD немного быстрее, чем программа в DataFrame, что согласованно из-за использования декодеров, извлекающих тип данных каждого поля записи DataFrame.

Вопрос в том,, Что я делаю неправильно?Разве Scala-код не быстрее, чем Python?Может ли кто-нибудь объяснить мне, что я делаю неправильно в своем коде?Ответ от @ zero323 очень хороший и иллюстративный, но я не могу понять, как такой простой код в Scala может работать медленнее, чем в Python.

Большое спасибо за то, что нашли время прочитать мой вопрос.

1 Ответ

0 голосов
/ 09 октября 2018

Попробуйте эту реализацию в Scala.Это быстрее:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val parts = 96
val repl = 1000000
val rep = 20000

val m = Vector.tabulate(repl, 10)((_,i) => i)

val myop = udf( (value: Seq[Int]) =>
  (0 until rep).foldLeft(0.0) {(acc,_)=>
    acc + Math.exp(Math.log(value.sum) / Math.log(value.length))
  }
)

val c1 = sc.parallelize(m, parts)
  .toDF("Data")
  .withColumn("Name",monotonically_increasing_id())
  .withColumn("Mean",myop('Data))

c1.count()
val d1 = c1.take(5)
println(d1.deep.mkString(","))

Это может быть даже чище, я думаю, если бы я понял, что на самом деле выполняет функция myop.

Редактировать:

Как упомянуто в комментарии @ user6910411, эта реализация быстрее только потому, что она делает то же самое, что и код Python (пропуская большую часть вычислений).Оригинальные реализации Scala и Python, представленные в вопросе, не равны.

...