среда: spark
4.4 + k8s
Я много использовал mapPartitions, всегда думаю, что код в mapPartitions
или forPartitions
выполняется на исполнителе jvm, но в этом случае он конфликтует . Перед использованием sliding
он регистрируется в модуле исполнителя, но после скольжения я нашел журнал драйвера.
Не знаю почему?
Кто-нибудь может это объяснить (на самом деле я действительно хочу, чтобы он выполнялся на исполнителе), Спасибо ..
val cleanedDF = df
.groupBy(_.id)
.mapPartitions {
par =>
val rows = par.map {
case (id, it) =>
// read from redis, if not exists, create a new object and save to redis at the same time
val status: Option[Status] = CacheDB.readXorWriteNx[Status](id)
val series = it.toList
val goldp2p = series
.map { r =>
//the following found on executor pod
logger.debug("before_sliding||key={}||time={}|host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
r
}
.sliding(2, 1)
.map { r =>
//the following found on driverpod
logger.debug("after_sliding||key={}||time={}||host={}", id, r(1).time, java.net.InetAddress.getLocalHost().getHostName())
r
}.filter(_.size >= 2)
.map{p2p=>
// update status according some conditions; all the updates happen on driver
if{???} status.num = 1
else if {???} status.num = 2
....
}
//save to redis,however the following log found on executor again; but the result in redis is the initiated status
logger.debug("key={}||value={}||host={}", id, status, java.net.InetAddress.getLocalHost().getHostName())
CacheDB.write[Status](id, status)
и, наконец, я изменил свой код вот так: Все в порядке, выполняется на исполнителе и результат такой, какой я хочу.
(0 until series.size - 1).map {
i =>
List(series(i), series(i + 1))
}...