Как оптимизировать потоковую агрегацию с сокетом источника данных? - PullRequest
0 голосов
/ 23 декабря 2018

Я использую Spark 2.4.0 с Scala 2.11 на 4 процессорных ядрах и 8 потоках.

Я написал следующее приложение:

package demos.spark

object WordCounter {

  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession
      .builder
      .master("local[4]")
      .getOrCreate
    import spark.implicits._
    spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .flatMap(_.split("\\W+"))
      .groupBy("value")
      .count
      .writeStream
      .outputMode("complete")
      .format("console")
      .start
      .awaitTermination
  }
}

Время обработки для приложения с local[1] занимает ~ 60 секунд.Для local[8] оно уменьшается до ~ 15 секунд, и это минимальное значение, которое я когда-либо получал.

Я всегда посылаю одно или два предложения через сокет в качестве ввода.

Это ожидаемоеповедение?Как оптимизировать приложение, чтобы оно занимало 1 секунду?

РЕДАКТИРОВАТЬ: После долгих часов, потраченных на эту проблему, наконец-то у меня есть решение.Проблема заключалась в слишком большом количестве разделов (несколько сотен), которые по умолчанию использовались Spark.После добавления опции spark.sql.shuffle.partitions, равной 8 (количество ядер на моем компьютере), продолжительность обработки данных была уменьшена до 300-400 мс

val spark = SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.shuffle.partitions", 8)
  .getOrCreate

Я пока не знаю, если это число должно быть постоянным или нет, что если приложение Spark будет работать на инфраструктуре, которая может изменяться (Spark, Kubernetes, AWS, автоматическое масштабирование)?

1 Ответ

0 голосов
/ 26 декабря 2018

4 ядра ЦП и 8 потоков.

Используйте local[*], и Spark будет использовать столько потоков обработки, сколько имеется ядер, т.е. 4. Если эти 8 потоков являются виртуальными ядрами,Spark увидит 8 «ядер ЦП», поэтому максимальное количество потоков для обработки равно 8.

И это именно то, что доказали ваши тесты, т. Е.

Для local[8] оно падаетдо ~ 15 секунд, и это минимальное значение, которое я когда-либо получал.

Это ожидаемое поведение?

Да, и вряд ли можно превзойти время, если вы не измените обработкулогика, то есть сам структурированный запрос.Вот где я обычно говорю, чтобы подумать об алгоритме (который может отличаться в зависимости от обрабатываемых данных).Вы ограничены количеством доступных ядер ЦП.

Как оптимизировать приложение, чтобы иметь время обработки в 1 секунду?

Изменить структурированный запрос ("алгоритм ") или как он работает под прикрытием.

Следующие операции являются логикой обработки:

.flatMap(_.split("\\W+"))
.groupBy("value")
.count

flatMap дешево и может получить так же быстро, как и процессорсердечники.Вы не можете ничего с этим поделать.

Вы также используете потоковую агрегацию groupBy, за которой следует count, которая изменяет количество задач, необходимых для выполнения (в вашем случае это будет от 8 доколичество случайных разделов по умолчанию, т. е. 200).

Вы можете посчитать количество тактов ЦП, необходимое для запуска 200 задач на 8 ядрах, и вам потребуется столько времени для вычисления результата.

Проблема заключалась в слишком большом количестве разделов (несколько сотен), которые по умолчанию использовались Spark.После добавления опции spark.sql.shuffle.partitions, равной 8 (количество ядер на моем компьютере), продолжительность обработки данных была уменьшена до 300-400 мс

Конечно, это помогло в этомчастный случай, и это нормально, если это единственное оборудование, которое у вас может быть.Вы закончили.

А как насчет других сред, в которых количество ядер может значительно увеличиться?

, если это число должно быть постоянным или нет, что если приложение Spark будет работатьоб инфраструктуре, которая может меняться (Spark, Kubernetes, AWS, автоматическое масштабирование)?

Это самый сложный вопрос.Добро пожаловать в очень динамичный / настраиваемый мир Apache Spark.Есть так много факторов, которые влияют на конечный результат, поэтому часто то, что у вас есть, это то, что вы должны получить, или вы начинаете настраивать много опций конфигурации , и вам придется тратить часы или недели, чтобы понять, что лучшеКонфигурация должна быть.Подумайте о различных данных, которые будет обрабатывать ваш потоковый запрос (форма данных, объем и скорость).Это добавляет путаницы.

В консультативной шапке, в какой-то момент вам нужно будет решить, достаточно ли высока производительность приложения или вы потратите недели, надеясь, что вы сможете добиться большего успеха, чем уже было.достигнуто (и кто-то должен за это заплатить).

, если это число должно быть постоянным или нет

Должно, если вы знаете все данные, которые вы когда-либо будете обрабатывать,Вы можете сделать такое жесткое предположение.

Это не должно быть вообще, и поэтому Spark дает вам Адаптивное выполнение запросов ( видео ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...