Я использую автономный кластер Spark 2.3.1.
Моя работа сводится к потреблению мини-пакетов Kafka каждые 2 минуты и записи агрегации в некоторый магазин Работа выглядит следующим образом:
val stream = KafkaUtils.createDirectStream(...)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
stream.map(x=> Row(...))
.flatMap(r=>...)
.map(r=> (k,r))
.reduceByKey((r1, r2) => r)
.map { case (_, v) => v}
.foreachRDD { (rdd, time) => // write data}
Работа была в порядке и выполнялась почти неделю. Но потом это сложилось - я вижу, что партии запущены, но в Кафке началось отставание, а также смещения стали неизменными.
В логах присутствуют следующие сообщения, и я вижу, что они повторяются каждые 2 минуты (ИМХО, интервал между партиями). Поэтому я полагаю, что работа зависла из-за повторных попыток.
[org.apache.spark.scheduler.TaskSetManager] Starting task 14.0 in stage 13042.0 (TID 7732814, 10.9.6.53, executor 5, partition 14, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 0.0 in stage 13042.0 (TID 7732815, 10.9.6.77, executor 3, partition 0, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 1.0 in stage 13042.0 (TID 7732816, 10.9.6.61, executor 2, partition 1, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 4.0 in stage 13042.0 (TID 7732812, 10.9.6.65, executor 1, partition 4, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSetManager] Starting task 28.0 in stage 13042.0 (TID 7732813, 10.9.6.62, executor 4, partition 28, PROCESS_LOCAL, 7846 bytes)
[org.apache.spark.scheduler.TaskSchedulerImpl] Adding task set 13042.0 with 186 tasks
[org.apache.spark.scheduler.DAGScheduler] Submitting 186 missing tasks from ShuffleMapStage 13042 (MapPartitionsRDD[117358] at map at MyAgg.scala:373) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
Перезапуск решил проблему.
Мой вопрос - есть ли идеи, почему это произошло, и что я могу сделать / настроить, чтобы это не повторилось?