Я создаю искровое задание, и часть кода, о которой идет речь, выглядит следующим образом:
var withBansDf = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)
// schema is previously and correctly defined
for (id <- championIds) {
var bansDf = dataframe.
where(array_contains($"team1_bans", id.getInt(0)) || array_contains($"team2_bans", id.getInt(0))).
groupBy($"patch_game_version").count().
withColumn("champion_id", typedLit(id.getInt(0))).
withColumnRenamed("count","banned_count")
var banrateDf = bansDf.join(gamesByPatchDf, Seq("patch_game_version")).withColumn("ban_rate", $"banned_count"/$"games_count")
var championBanRatesDf = inProgress.join(broadcast(banrateDf), Seq("champion_id", "patch_game_version"))
withBansDf = withBansDf.union(championBanRatesDf)
}
Это завершается, но когда я пытаюсь выполнить какие-либо операции с withBansDf
кадром данных, например show
илиcount
процесс выполняется долгое время, и пользовательский интерфейс spark сообщает, что задание завершается через некоторое время (10-15 минут).Тем не менее, нет никакого результата.Счетчик не возвращается или кадр данных вообще не отображается.
Я предполагаю, что это потому, что моя работа неэффективна и / или сталкивается с некоторыми проблемами с памятью.Если я запускаю один и тот же блок кода внутри цикла for всего на несколько ids
, он работает как положено.
Вот последние выходные данные консоли:
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Getting 6 non-empty blocks out of 200 blocks
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/05/30 16:54:57 INFO Executor: Finished task 199.0 in stage 3792.0 (TID 341227). 3597 bytes result sent to driver
18/05/30 16:54:57 INFO TaskSetManager: Finished task 199.0 in stage 3792.0 (TID 341227) in 10 ms on localhost (executor driver) (200/200)
18/05/30 16:54:57 INFO TaskSchedulerImpl: Removed TaskSet 3792.0, whose tasks have all completed, from pool
18/05/30 16:54:57 INFO DAGScheduler: ShuffleMapStage 3792 (run at ThreadPoolExecutor.java:1149) finished in 88.910 s
18/05/30 16:54:57 INFO DAGScheduler: looking for newly runnable stages
18/05/30 16:54:57 INFO DAGScheduler: running: Set()
18/05/30 16:54:57 INFO DAGScheduler: waiting: Set(ResultStage 3793)
18/05/30 16:54:57 INFO DAGScheduler: failed: Set()
18/05/30 16:54:57 INFO DAGScheduler: Submitting ResultStage 3793 (MapPartitionsRDD[13099] at run at ThreadPoolExecutor.java:1149), which has no missing parents
18/05/30 16:54:57 INFO MemoryStore: Block broadcast_3795 stored as values in memory (estimated size 34.9 KB, free 366.2 MB)
18/05/30 16:54:57 INFO MemoryStore: Block broadcast_3795_piece0 stored as bytes in memory (estimated size 15.9 KB, free 366.2 MB)
18/05/30 16:54:57 INFO BlockManagerInfo: Added broadcast_3795_piece0 in memory on 192.168.99.210:53425 (size: 15.9 KB, free: 366.3 MB)
18/05/30 16:54:57 INFO SparkContext: Created broadcast 3795 from broadcast at DAGScheduler.scala:1006
18/05/30 16:54:57 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 3793 (MapPartitionsRDD[13099] at run at ThreadPoolExecutor.java:1149) (first 15 tasks are for partitions Vector(0, 1))
18/05/30 16:54:57 INFO TaskSchedulerImpl: Adding task set 3793.0 with 2 tasks
18/05/30 16:54:57 INFO TaskSetManager: Starting task 1.0 in stage 3793.0 (TID 341228, localhost, executor driver, partition 1, PROCESS_LOCAL, 4726 bytes)
18/05/30 16:54:57 INFO Executor: Running task 1.0 in stage 3793.0 (TID 341228)
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/05/30 16:54:57 INFO Executor: Finished task 1.0 in stage 3793.0 (TID 341228). 3826 bytes result sent to driver
18/05/30 16:54:57 INFO TaskSetManager: Starting task 0.0 in stage 3793.0 (TID 341229, localhost, executor driver, partition 0, ANY, 4726 bytes)
18/05/30 16:54:57 INFO Executor: Running task 0.0 in stage 3793.0 (TID 341229)
18/05/30 16:54:57 INFO TaskSetManager: Finished task 1.0 in stage 3793.0 (TID 341228) in 6 ms on localhost (executor driver) (1/2)
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 200 blocks
18/05/30 16:54:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/05/30 16:54:57 INFO Executor: Finished task 0.0 in stage 3793.0 (TID 341229). 3847 bytes result sent to driver
18/05/30 16:54:57 INFO TaskSetManager: Finished task 0.0 in stage 3793.0 (TID 341229) in 6 ms on localhost (executor driver) (2/2)
18/05/30 16:54:57 INFO TaskSchedulerImpl: Removed TaskSet 3793.0, whose tasks have all completed, from pool
18/05/30 16:54:57 INFO DAGScheduler: ResultStage 3793 (run at ThreadPoolExecutor.java:1149) finished in 0.013 s
18/05/30 16:54:57 INFO DAGScheduler: Job 1266 finished: run at ThreadPoolExecutor.java:1149, took 88.934566 s
Я ищу точную причину, по которой это не удается, и / или варианты улучшения работы, чтобы она не провалилась.