используя скольжение в mapPartitions, но результат выполняется в драйвере - PullRequest
0 голосов
/ 04 августа 2020

среда: spark 4.4 + k8s

Я много использовал mapPartitions, всегда думаю, что код в mapPartitions или forPartitions выполняется на исполнителе jvm, но в этом случае он конфликтует . Перед использованием sliding он регистрируется в модуле исполнителя, но после скольжения я нашел журнал драйвера.

Не знаю почему?

Кто-нибудь может это объяснить (на самом деле я действительно хочу, чтобы он выполнялся на исполнителе), Спасибо ..


val cleanedDF = df
 .groupBy(_.id)  
 .mapPartitions {
   par => 
     val rows = par.map {
       case (id, it) =>
         // read from redis, if not exists, create a new object and save to redis at the same time 
         val status: Option[Status] = CacheDB.readXorWriteNx[Status](id)
         val series = it.toList  
         val goldp2p = series
          .map { r => 
             //the following found on executor pod
                 logger.debug("before_sliding||key={}||time={}|host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
                 r
           }
          .sliding(2, 1)
          .map { r =>
             //the following found on driverpod
             logger.debug("after_sliding||key={}||time={}||host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
          r
          }.filter(_.size >= 2)
          .map{p2p=>
             // update status according some conditions; all the updates happen on driver
             if{???} status.num = 1
             else if {???} status.num = 2
             ....
          }
         //save to redis,however the following log found on executor again; but the result in redis is the initiated status
         logger.debug("key={}||value={}||host={}", id, status, java.net.InetAddress.getLocalHost().getHostName())
         CacheDB.write[Status](id, status) 

и, наконец, я изменил свой код вот так: Все в порядке, выполняется на исполнителе и результат такой, какой я хочу.

       (0 until series.size - 1).map {
                    i =>
                      List(series(i), series(i + 1))
                  }...

...