Spark Открытие нескольких потоков для одного задания при попытке запустить параллельные задания - PullRequest
0 голосов
/ 18 мая 2018

У нас есть сценарий использования, в котором мы должны были запускать параллельные запросы sql spark для одного сеанса spark через rest-api (akka http).

Application Conf

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // or in Akka 2.4.2+
    fixed-pool-size = 4
  }
  throughput = 100
} 

Spark Service

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.apache.spark.sql.execution.ui.CustomSqlListener
import org.apache.spark.sql.{Row, SparkSession}
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.parsing.json.JSON

trait SparkService {

  val session = SparkSession
                .builder()
                .config("spark.scheduler.mode", "FAIR")
                .appName("QueryCancellation")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate()

  var queryJobMapStart = Map[String, String]()
  var queryStatusMap = Map[String,String]()
  session.sparkContext.setLogLevel("ERROR")
  session.sparkContext.setCallSite("Reading the file")
  val dataDF = session.read
      .format("csv")
      .option("inferSchema","true")
      .option("header","true")
      .load("C:\\dev\\QueryCancellation\\src\\main\\resources\\Baby_Names__Beginning_2007.csv")



  dataDF.createOrReplaceTempView("data_tbl")


  //dataDF.printSchema()

  val customListener = new CustomSqlListener(session.sparkContext.getConf,queryJobMapStart,queryStatusMap)
  val appListener = session.sparkContext.addSparkListener(customListener)


  def runQuery(query : String, queryId: String)(implicit  ec : ExecutionContext)=  {


  //  println("queryId: "+ queryId +" query:" + query)
    session.sparkContext.setLocalProperty("callSite.short",queryId)
    session.sparkContext.setLocalProperty("callSite.long",query)

     session.sql(query).show(2)
    //Thread.sleep(60000)
   // Future(data)

  }

}

object SparkService extends SparkService 

Сервис запросов

import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
trait QueryService extends SparkService {
  implicit val system: ActorSystem
  implicit val materializer : ActorMaterializer
  // implicit val sparkSession: SparkSession
  // val datasetMap = new ConcurrentHashMap[String, Dataset[Row]]()
  implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
  val route: Route =
    pathSingleSlash {
      get {
        complete {
          "welcome to rest service"
        }
      }
    } ~
      path("runQuery" / "county"/Segment) { county  =>
        get {
          complete{
            var res= ""
            val documentId = "user ::" + UUID.randomUUID().toString
            val queryId = System.nanoTime().toString
            val stmt = "select a.sex,count(*) from data_tbl a,data_tbl b where b.county=a.county and a.country= '"+county+"' group by a.sex"
            val result = runQuery(stmt,queryId)
            /* var entity = queryResult match {
               case Some(result) =>s"Query : $stmt  is submitted. Query id is $result. User id is $documentId"
               case None => s"Query : $stmt could not be submitted. User id is $documentId"
             }*/
            /*result.onComplete{
              case Success(value) => println(s"Query completed")
              case Failure(e) =>  None
            }*/
            var entity = s"Query : $stmt  is submitted. Query id is $queryId. User id is $documentId"
            entity
          }
        }
      } ~
      path("getStatus" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = getStatus(id)
            var res = statusResult match {
              case Some(result) =>  s"Status for query id : $id is $result"
              case None =>  s"Could not find the status of the query id : $id"
            }
            res
          }
        }
      } ~
      path("killQuery" / """[\w[0-9]-_]+""".r) { id =>
        get {
          complete {
            val statusResult = killQuery(id)
            s"Query id $id is cancelled."
          }
        }
      }
}

Сервер запросов

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer

import scala.concurrent.Future
//import scala.concurrent.ExecutionContext.Implicits.global

class QueryServer (implicit val system:ActorSystem ,
                   implicit val materializer: ActorMaterializer) extends QueryService {

  def startServer(address : String, port: Int) = {
      Http().bindAndHandle(route,address,port)
  }

}

    object QueryServer extends App {

      implicit val actorSystem = ActorSystem("query-server")
      implicit val materializer = ActorMaterializer()
      val server = new QueryServer()
      server.startServer("localhost",8080)
      println("running server at localhost 8080")

    }

Когда я пытаюсь выполнить запрос в Spark SQL через http:localhost:8080/runQuery/county/'KINGS', создается несколько идентификаторов заданий, из которых максимально пропускается.

Ниже приведен снимок экранаSpark UI.Я не могу понять, почему создаются выделенные задачи.

enter image description here

Ниже приведен журнал консоли, который показывает, что задание было выполнено только один раз:

"running server at localhost 8080
173859599588358->2
****************************************************************************************
****************************************************************************************
Job id 2 is completed
--------------------------------------------------------
173859599588358->3
****************************************************************************************
****************************************************************************************
Job id 3 is completed
--------------------------------------------------------
173859599588358->4
****************************************************************************************
****************************************************************************************
Job id 4 is completed
--------------------------------------------------------
173859599588358->5
****************************************************************************************
****************************************************************************************
Job id 5 is completed
--------------------------------------------------------
173859599588358->6
****************************************************************************************
****************************************************************************************
Job id 6 is completed
--------------------------------------------------------
173859599588358->7
****************************************************************************************
****************************************************************************************
Job id 7 is completed
--------------------------------------------------------
+---+--------+
|sex|count(1)|
+---+--------+
|  F|12476769|
|  M|12095080|
+---+--------+

Версия Spark:- 2.2.1

Ответы [ 2 ]

0 голосов
/ 18 мая 2018

Похоже, оптимизатор искрового катализатора оптимизирует запрос.Создано более одной группы доступности базы данных и, возможно, выбирается лучший план выполнения.Вы можете увидеть план выполнения, и их может быть несколько.Я думаю, что здесь нет никакого отношения к akka http.Попробуйте запустить код в оболочке spark и можете подтвердить претензию.

0 голосов
/ 18 мая 2018

Партия option("inferSchema","true") запустит задание искры в дополнение к остальной логике.Подробнее см. Spark API .

Из-за того, что задания недолговечны, они могут быть последовательными, а не параллельными (это говорит с самого начала).Посмотрели ли вы вкладку SQL пользовательского интерфейса Spark.Возможно, что все эти задания (кроме одного, выводящего схему) являются частью одного выполненного SQL-запроса.

...