SparkSQL на кластере Raspberry Pi - PullRequest
0 голосов
/ 11 июля 2019

Мы пытаемся сравнить TPC-H (масштабный коэффициент 1) на кластере Raspberry Pi 3B + с 13 узлами (1 мастер, 12 рабочих). Каждый узел имеет 1 ГБ оперативной памяти и четырехъядерный процессор, работающий под управлением Ubuntu Server 18.04. В кластере используется автономный планировщик Spark с файлами * .tbl из инструмента TPCH dbgen, хранящимися в HDFS. Мы испытываем несколько сбоев при попытке выполнить запросы. Задания не выполняются непредсказуемо, обычно в веб-интерфейсе пользователя отображается один или несколько узлов «DEAD / LOST». Похоже, что один или несколько узлов «зависают» во время выполнения запроса и становятся недоступными / время ожидания. Мы включили наши параметры конфигурации, а также программу драйвера ниже. Любые рекомендации будут с благодарностью.

Файл конфигурации:

# spark-defaults.conf
spark.cores.max                    36
spark.cleaner.periodicGC.interval  5min
spark.driver.extraJavaOptions      -XX:+UseCompressedOops
spark.driver.memory                600m
spark.executor.cores               3
spark.executor.extraJavaOptions    -XX:+UseCompressedOops
spark.executor.heartbeatInterval   60s
spark.executor.memory              600m
spark.master                       spark://rpnmas:7077
spark.network.timeout              300s
spark.submit.deployMode            client
spark.reducer.maxSizeInFlight                8m
spark.sql.shuffle.partitions                 400
spark.sql.sort.enableRadixSort               false
spark.sql.inMemoryColumnarStorage.batchSize  10000
spark.rpc.message.maxSize                    32

Driver:

object TPCH {
  ...…<table schema>...

  def main(args: Array[String]): Unit = {
    val tabledir = args(0)
    val querydir = args(1)
    val numIters = args(2).toInt
    val spark = SparkSession.builder.appName("TPCH").getOrCreate

    //load tables from hdfs + cache
    for (table <- tables) {
      val path = tabledir + "/" + table.name + ".tbl"
      val df = spark.read.schema(table.schema).option("sep", "|").csv(path)
      df.createOrReplaceTempView(table.name)
      spark.catalog.cacheTable(table.name)
      }

    //load queries from text file
    val queries = (1 to 22).map { q =>
      val path = querydir + s"/$q.sql"
      val source = scala.io.Source.fromFile(path)
      val query = try source.mkString finally source.close()
      query
    }

    for ((query, i) <- queries.zipWithIndex) {
      for (j <- 1 to numIters) {
        val start = System.currentTimeMillis()
        spark.sql(query).collect
        val stop = System.currentTimeMillis()
        val time = (stop-start)/1000.0
        println(i + "," + j + "," + time)
      }
    }
  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...