Как определить оптимальное количество потоков в приложении Spark? - PullRequest
0 голосов
/ 29 января 2020

В моем приложении Scala / Spark я пытаюсь правильно использовать многопроцессорность. Как видно из кода ниже, количество потоков равно количеству элементов в массиве storage. Я проверил текущий код, и он работает. Но, как вы можете видеть, в массиве storage есть только 2 элемента. Мне кажется, что если в массиве будет большое количество элементов, возникнут проблемы. В моем случае я не знаю, сколько элементов будет в массиве в будущем. Возможно, мне следует ограничить количество потоков и запускать новые потоки только при обработке предыдущих.

Вопрос: Как определить оптимальное количество потоков?

Main.app:

import org.apache.spark.sql.DataFrame
import utils.CustomThread

object MainApp {
    def main(args: Array[String]): Unit = {
      // Create the main DataFrame with all information.
      var baseDF: DataFrame = spark.read.option("delimiter", "|").csv("/path_to_the_files/")

      // Cache the main DataFrame.
      baseDF.persist(StorageLevel.MEMORY_AND_DISK)

      // The first time DataFrame is computed in an action, it will be kept in memory on the nodes.
      baseDF.count()

      // Create arrays with the different identifiers
      var array1 = Array("6fefc487-bd57-4fa2-808a-3845703b83d0", "9baba76b-07c2-48ec-a153-6cfb8b138ecf")
      var array2 = Array("ab654369-77f5-478c-94e5-ee2755ae8571", "3b43e0a6-deba-4919-a2cc-9d450e28e0fe")
      var storage = Array(array1, array2)

      // Check if the main DataFrame is empty or not.
      if (baseDF.head(1).nonEmpty) {
        for (item <- storage) {
          val thread = new Thread(new CustomThread(baseDF, item))
          thread.start()
        }
      }
    }
}

CustomThread. scala:

package utils

import org.apache.spark.sql.DataFrame

class CustomThread(baseDF: DataFrame, item: Array[String]) extends Runnable {
  override def run(): Unit = {
    val df = baseDF.filter(col("col1").isin(item:_*))

    println("Count: " + df.count())
  }
}

Я использую такие конфигурации:

spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max.mb: 1024
spark.executor.memory: 2g
spark.sql.autoBroadcastJoinThreshold: -1
spark.sql.files.ignoreCorruptFiles: true
spark.driver.memory: 30g
spark.driver.maxResultSize: 20g
spark.executor.cores: 1
spark.cores.max: 48
spark.scheduler.mode: FAIR

1 Ответ

0 голосов
/ 29 января 2020

Чего вы хотите достичь с помощью многопоточности? В искровом приложении вам не нужно беспокоиться о количестве потоков и т. Д. c. Ваш код выполняет запуск параллельных заданий (многопоточность только на драйвере), для исполнителей разницы нет.

По моему опыту, я использую параллельные запуски заданий, только если у меня несколько небольшие или искаженные задания, например ресурсы кластера, используются не полностью. Если я это сделаю, я использую скалярные параллельные коллекции.

Чтобы ответить на ваш вопрос: оптимальное число, если потоки, вероятно, 1

РЕДАКТИРОВАТЬ: я бы предложил переписать ваш код полностью с целью иметь все результаты в новом фрейме данных, это лучше, чем реализация сложной многопоточности:

// testcase
val baseDf = Seq(
  "6fefc487-bd57-4fa2-808a-3845703b83d0",
  "9baba76b-07c2-48ec-a153-6cfb8b138ecf",
  "ab654369-77f5-478c-94e5-ee2755ae8571",
  "dummy"
).toDF("col1")

var array1 = Seq("6fefc487-bd57-4fa2-808a-3845703b83d0", "9baba76b-07c2-48ec-a153-6cfb8b138ecf")
var array2 = Seq("ab654369-77f5-478c-94e5-ee2755ae8571", "3b43e0a6-deba-4919-a2cc-9d450e28e0fe")
var storage = Seq(array1, array2)

broadcast(storage.toDF("storage"))
  .join(baseDf,array_contains($"storage",$"col1"),"left")
  .groupBy($"storage").agg(count($"col1").as("count"))
  .show()

дает:

+--------------------+-----+
|             storage|count|
+--------------------+-----+
|[ab654369-77f5-47...|    1|
|[6fefc487-bd57-4f...|    2|
+--------------------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...