Как Spark обрабатывает сценарии сбоев с использованием источника данных JDBC? - PullRequest
0 голосов
/ 10 января 2019

Я пишу источник данных, который имеет сходство с реализацией источника данных JDBC в Spark, и я хотел бы спросить, как Spark обрабатывает определенные сценарии сбоев. Насколько я понимаю, если исполнитель умирает во время выполнения задачи, Spark возродит исполнителя и попытается повторно запустить эту задачу. Однако как это работает в контексте целостности данных и API источника данных JDBC Spark (например, df.write.format("jdbc").option(...).save())?

В функции savePartition JdbcUtils.scala мы видим, что Spark вызывает функции фиксации и отката объекта соединения Java, созданного на основе URL-адреса / учетных данных базы данных, предоставленных пользователем (см. Ниже). Но если исполнитель умирает сразу после завершения commit () или перед вызовом rollback (), пытается ли Spark перезапустить задачу и снова записать тот же раздел данных, по существу создавая дублированные зафиксированные строки в базе данных? И что произойдет, если исполнитель умрет в середине вызова commit () или rollback ()?

try {
    ...
    if (supportsTransactions) {
        conn.commit()
    }
    committed = true
    Iterator.empty
} catch {
    case e: SQLException =>
        ...
        throw e
} finally {
    if (!committed) {
        // The stage must fail.  We got here through an exception path, so
        // let the exception through unless rollback() or close() want to
        // tell the user about another problem.
        if (supportsTransactions) {
          conn.rollback()
        }
        conn.close()
    } else {
        ...
    }
}

Ответы [ 2 ]

0 голосов
/ 11 января 2019

Но если исполнитель умирает сразу после завершения commit () или перед вызовом rollback (), пытается ли Spark перезапустить задачу и снова записать тот же раздел данных, по существу создавая дублированные зафиксированные строки в базе данных?

Чего бы вы ожидали, поскольку Spark SQL (который является высокоуровневым API по сравнению с RDD API) на самом деле мало что знает обо всех особенностях JDBC или любого другого протокола? Не говоря уже о базовой среде выполнения, то есть Spark Core.

Когда вы пишете структурированный запрос наподобие df.write.format(“jdbc”).option(...).save() Spark SQL преобразует его в распределенные вычисления с использованием низкоуровневого RDD API, подобного сборке. Поскольку он пытается охватить как можно больше «протоколов» (включая JDBC), API-интерфейс данных Spark SQL оставляет большую часть обработки ошибок самому источнику данных.

Ядро Spark, которое планирует задачи (которое не знает или даже не заботится о том, какие задачи выполняют), просто следит за выполнением, и если задача не выполняется, она пытается выполнить ее снова (до 3 неудачных попыток по умолчанию).

Поэтому, когда вы пишете пользовательский источник данных, вы знаете детали и вам приходится иметь дело с такими повторениями в вашем коде.

Одним из способов обработки ошибок является регистрация прослушивателя задач с использованием TaskContext (например, addTaskCompletionListener или addTaskFailureListener).

0 голосов
/ 10 января 2019

Мне пришлось ввести некоторую логику дедупликации именно по описанным причинам. Вы можете получить одно и то же совершенное дважды (или больше).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...