Spark: Почему Python значительно превосходит Scala в моем случае использования? - PullRequest
16 голосов
/ 23 февраля 2020

Чтобы сравнить производительность Spark при использовании Python и Scala, я создал одно и то же задание на обоих языках и сравнил время выполнения. Я ожидал, что обе работы займут примерно одинаковое количество времени, но Python работа заняла только 27min, а Scala работа заняла 37min (почти на 40% дольше!). Я реализовал ту же работу в Java, и это заняло 37minutes. Как это возможно, что Python намного быстрее?

Минимальный проверяемый пример:

Python задание:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Scala задание:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Просто глядя на код, они кажутся идентичными. Я посмотрел на группы DAG, и они не предоставили никакой информации (или, по крайней мере, у меня нет ноу-хау, чтобы придумать объяснения, основанные на них).

Буду очень признателен за любые указатели.

Ответы [ 2 ]

11 голосов
/ 25 февраля 2020

Ваше базовое c предположение, что Scala или Java должно быть быстрее для этой конкретной задачи c, просто неверно. Вы можете легко проверить это с минимальными локальными приложениями. Scala one:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python one

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Результаты (300 повторений каждый, Python 3.7.6, Scala 2.11.12), вкл. Posts.xml из дамп данных hermeneutics.stackexchange.com со смесью совпадающих и несоответствующих шаблонов:

boxplots of durartion in millis for above programs

  • Python 273.50 (258,84, 288,16)
  • Scala 634,13 (533,81, 734,45)

Как видите, Python не только систематически быстрее, но и более согласованно ( более низкий спред).

Убрать сообщение - не верьте необоснованным FUD - языки могут работать быстрее или медленнее в определенных c задачах или в определенных c средах (например, здесь Scala может ударить запуск JVM и / или G C и / или JIT), но если вы говорите, что «XYZ - это X4 быстрее» или «XYZ медленнее по сравнению с ZYX (..) Приблизительно, 10x «Обычно это означает, что кто-то написал действительно плохой код для проверки».

Edit :

Для решения некоторых проблем, поднятых в комментарии:

  • В коде OP данные передаются в основном в одном направлении (JVM -> Python), и никакой реальной сериализации не требуется (этот конкретный c путь просто проходит строку байтов как есть и декодирует на UTF-8 с другой стороны). Это настолько дешево, насколько это возможно, когда дело доходит до «сериализации».
  • То, что передается обратно, представляет собой одно целое число на раздел, поэтому в этом направлении влияние незначительно.
  • Связь завершена локальные сокеты (все взаимодействие на рабочем месте за пределами первоначального подключения и аутентификации выполняется с использованием дескриптора файла , возвращенного из local_connect_and_auth, и его не что иное, как связанный с сокетом файл ). Опять же, как дешево, как это бывает, когда дело доходит до связи между процессами.
  • Учитывая разницу в производительности, показанную выше (намного выше, чем вы видите в вашей программе), существует большой запас накладных расходов, перечисленных выше. .
  • Этот случай полностью отличается от случаев, когда простые или сложные объекты должны передаваться интерпретатору Python и обратно в форме, доступной для обеих сторон в качестве совместимых с рассылкой дампов (наиболее примечательные примеры включают UDF старого стиля, некоторые части MLLib старого стиля).

Редактировать 2 :

Так как jasper-m был обеспокоен о стоимости запуска здесь легко можно доказать, что Python все еще имеет значительное преимущество перед Scala, даже если размер ввода значительно увеличен.

Вот результаты для 2003360 строк / 5.6G (тот же самый ввод, просто дублируется несколько раз, 30 повторений), что превосходит все, что вы можете ожидать в одной задаче Spark.

enter image description here* 106 5 *

  • Python 22809,57 (21466,26, 24152,87)
  • Scala 27315,28 (24367,24, 30263,31)

Обратите внимание на непересекающиеся доверительные интервалы.

Редактировать 3 :

По адресу другой комментарий от Jasper-M:

Основная масса всей обработки все еще происходит внутри JVM в случае Spark.

Это просто неверно в данном конкретном случае:

  • Речь идет о задании на карту с одним глобальным сокращением с использованием PyDpark RDD.
  • PySpark RDD (в отличие, скажем, DataFrame) изначально реализует брутто-функциональность в Python, с вводом-выводом исключений и связью между узлами.
  • Поскольку это одноэтапное задание, и конечный вывод достаточно мал, чтобы его можно было проигнорировать. Основная ответственность JVM (если это было придирчиво, это реализовано в основном в Java, а не Scala) - это запускать формат ввода Had oop и pu sh данные через файл сокета в Python.
  • Часть чтения идентична для JVM и Python API, поэтому ее можно рассматривать как постоянные издержки. Он также не может рассматриваться как большая часть обработки , даже для такой простой работы, как эта.
4 голосов
/ 24 февраля 2020

Задание Scala занимает больше времени, поскольку имеет неверную конфигурацию, и поэтому задания Python и Scala были предоставлены с неравными ресурсами.

В коде есть две ошибки:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LINE 1. После того, как линия была выполнена, конфигурация ресурса задания Spark уже установлена ​​и исправлена. С этого момента нет возможности что-либо настроить. Ни количество исполнителей, ни количество ядер на исполнителя.
  2. ЛИНИЯ 4-5. sc.hadoopConfiguration - это неправильное место для установки любой конфигурации Spark. Он должен быть установлен в config экземпляре, который вы передаете new SparkContext(config).

[ДОБАВЛЕНО]. Учитывая вышесказанное, я бы предложил изменить код задания Scala на

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

и повторите его снова. Бьюсь об заклад, версия Scala будет теперь в X раз быстрее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...