У нас есть сценарий использования, в котором мы должны были запускать параллельные запросы sql spark для одного сеанса spark через rest-api (akka http).
Application Conf
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
// or in Akka 2.4.2+
fixed-pool-size = 4
}
throughput = 100
}
Spark Service
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.execution.ui.CustomSqlListener
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.parsing.json.JSON
trait SparkService {
val session = SparkSession
.builder()
.config("spark.scheduler.mode", "FAIR")
.appName("QueryCancellation")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
var queryJobMapStart = Map[String, String]()
var queryStatusMap = Map[String,String]()
session.sparkContext.setLogLevel("ERROR")
session.sparkContext.setCallSite("Reading the file")
val dataDF = session.read
.format("csv")
.option("inferSchema","true")
.option("header","true")
.load("C:\\dev\\QueryCancellation\\src\\main\\resources\\Baby_Names__Beginning_2007.csv")
dataDF.createOrReplaceTempView("data_tbl")
//dataDF.printSchema()
val customListener = new CustomSqlListener(session.sparkContext.getConf,queryJobMapStart,queryStatusMap)
val appListener = session.sparkContext.addSparkListener(customListener)
def runQuery(query : String, queryId: String)(implicit ec : ExecutionContext)= {
// println("queryId: "+ queryId +" query:" + query)
session.sparkContext.setLocalProperty("callSite.short",queryId)
session.sparkContext.setLocalProperty("callSite.long",query)
session.sql(query).show(2)
//Thread.sleep(60000)
// Future(data)
}
}
object SparkService extends SparkService
Сервис запросов
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
trait QueryService extends SparkService {
implicit val system: ActorSystem
implicit val materializer : ActorMaterializer
// implicit val sparkSession: SparkSession
// val datasetMap = new ConcurrentHashMap[String, Dataset[Row]]()
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
val route: Route =
pathSingleSlash {
get {
complete {
"welcome to rest service"
}
}
} ~
path("runQuery" / "county"/Segment) { county =>
get {
complete{
var res= ""
val documentId = "user ::" + UUID.randomUUID().toString
val queryId = System.nanoTime().toString
val stmt = "select a.sex,count(*) from data_tbl a,data_tbl b where b.county=a.county and a.country= '"+county+"' group by a.sex"
val result = runQuery(stmt,queryId)
/* var entity = queryResult match {
case Some(result) =>s"Query : $stmt is submitted. Query id is $result. User id is $documentId"
case None => s"Query : $stmt could not be submitted. User id is $documentId"
}*/
/*result.onComplete{
case Success(value) => println(s"Query completed")
case Failure(e) => None
}*/
var entity = s"Query : $stmt is submitted. Query id is $queryId. User id is $documentId"
entity
}
}
} ~
path("getStatus" / """[\w[0-9]-_]+""".r) { id =>
get {
complete {
val statusResult = getStatus(id)
var res = statusResult match {
case Some(result) => s"Status for query id : $id is $result"
case None => s"Could not find the status of the query id : $id"
}
res
}
}
} ~
path("killQuery" / """[\w[0-9]-_]+""".r) { id =>
get {
complete {
val statusResult = killQuery(id)
s"Query id $id is cancelled."
}
}
}
}
Сервер запросов
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import scala.concurrent.Future
//import scala.concurrent.ExecutionContext.Implicits.global
class QueryServer (implicit val system:ActorSystem ,
implicit val materializer: ActorMaterializer) extends QueryService {
def startServer(address : String, port: Int) = {
Http().bindAndHandle(route,address,port)
}
}
object QueryServer extends App {
implicit val actorSystem = ActorSystem("query-server")
implicit val materializer = ActorMaterializer()
val server = new QueryServer()
server.startServer("localhost",8080)
println("running server at localhost 8080")
}
Когда я пытаюсь выполнить запрос в Spark SQL через http:localhost:8080/runQuery/county/'KINGS'
, создается несколько идентификаторов заданий, из которых максимально пропускается.
Ниже приведен снимок экранаSpark UI.Я не могу понять, почему создаются выделенные задачи.
Ниже приведен журнал консоли, который показывает, что задание было выполнено только один раз:
"running server at localhost 8080
173859599588358->2
****************************************************************************************
****************************************************************************************
Job id 2 is completed
--------------------------------------------------------
173859599588358->3
****************************************************************************************
****************************************************************************************
Job id 3 is completed
--------------------------------------------------------
173859599588358->4
****************************************************************************************
****************************************************************************************
Job id 4 is completed
--------------------------------------------------------
173859599588358->5
****************************************************************************************
****************************************************************************************
Job id 5 is completed
--------------------------------------------------------
173859599588358->6
****************************************************************************************
****************************************************************************************
Job id 6 is completed
--------------------------------------------------------
173859599588358->7
****************************************************************************************
****************************************************************************************
Job id 7 is completed
--------------------------------------------------------
+---+--------+
|sex|count(1)|
+---+--------+
| F|12476769|
| M|12095080|
+---+--------+
Версия Spark:- 2.2.1