Но если исполнитель умирает сразу после завершения commit () или перед вызовом rollback (), пытается ли Spark перезапустить задачу и снова записать тот же раздел данных, по существу создавая дублированные зафиксированные строки в базе данных?
Чего бы вы ожидали, поскольку Spark SQL (который является высокоуровневым API по сравнению с RDD API) на самом деле мало что знает обо всех особенностях JDBC или любого другого протокола? Не говоря уже о базовой среде выполнения, то есть Spark Core.
Когда вы пишете структурированный запрос наподобие df.write.format(“jdbc”).option(...).save()
Spark SQL преобразует его в распределенные вычисления с использованием низкоуровневого RDD API, подобного сборке. Поскольку он пытается охватить как можно больше «протоколов» (включая JDBC), API-интерфейс данных Spark SQL оставляет большую часть обработки ошибок самому источнику данных.
Ядро Spark, которое планирует задачи (которое не знает или даже не заботится о том, какие задачи выполняют), просто следит за выполнением, и если задача не выполняется, она пытается выполнить ее снова (до 3 неудачных попыток по умолчанию).
Поэтому, когда вы пишете пользовательский источник данных, вы знаете детали и вам приходится иметь дело с такими повторениями в вашем коде.
Одним из способов обработки ошибок является регистрация прослушивателя задач с использованием TaskContext (например, addTaskCompletionListener
или addTaskFailureListener
).