Мы пытаемся сравнить TPC-H (масштабный коэффициент 1) на кластере Raspberry Pi 3B + с 13 узлами (1 мастер, 12 рабочих). Каждый узел имеет 1 ГБ оперативной памяти и четырехъядерный процессор, работающий под управлением Ubuntu Server 18.04. В кластере используется автономный планировщик Spark с файлами * .tbl из инструмента TPCH dbgen, хранящимися в HDFS. Мы испытываем несколько сбоев при попытке выполнить запросы. Задания не выполняются непредсказуемо, обычно в веб-интерфейсе пользователя отображается один или несколько узлов «DEAD / LOST». Похоже, что один или несколько узлов «зависают» во время выполнения запроса и становятся недоступными / время ожидания. Мы включили наши параметры конфигурации, а также программу драйвера ниже. Любые рекомендации будут с благодарностью.
Файл конфигурации:
# spark-defaults.conf
spark.cores.max 36
spark.cleaner.periodicGC.interval 5min
spark.driver.extraJavaOptions -XX:+UseCompressedOops
spark.driver.memory 600m
spark.executor.cores 3
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.executor.heartbeatInterval 60s
spark.executor.memory 600m
spark.master spark://rpnmas:7077
spark.network.timeout 300s
spark.submit.deployMode client
spark.reducer.maxSizeInFlight 8m
spark.sql.shuffle.partitions 400
spark.sql.sort.enableRadixSort false
spark.sql.inMemoryColumnarStorage.batchSize 10000
spark.rpc.message.maxSize 32
Driver:
object TPCH {
...…<table schema>...
def main(args: Array[String]): Unit = {
val tabledir = args(0)
val querydir = args(1)
val numIters = args(2).toInt
val spark = SparkSession.builder.appName("TPCH").getOrCreate
//load tables from hdfs + cache
for (table <- tables) {
val path = tabledir + "/" + table.name + ".tbl"
val df = spark.read.schema(table.schema).option("sep", "|").csv(path)
df.createOrReplaceTempView(table.name)
spark.catalog.cacheTable(table.name)
}
//load queries from text file
val queries = (1 to 22).map { q =>
val path = querydir + s"/$q.sql"
val source = scala.io.Source.fromFile(path)
val query = try source.mkString finally source.close()
query
}
for ((query, i) <- queries.zipWithIndex) {
for (j <- 1 to numIters) {
val start = System.currentTimeMillis()
spark.sql(query).collect
val stop = System.currentTimeMillis()
val time = (stop-start)/1000.0
println(i + "," + j + "," + time)
}
}
}
}