Как избежать ошибки ORA-00060 (обнаружена взаимоблокировка) при обновлении таблицы Oracle в Spark - PullRequest
0 голосов
/ 08 января 2020

У меня странная ошибка в моей работе с искрой, и я бы использовал некоторые объяснения, если это возможно.

Итак, моя работа Spark загружает данные из таблицы Hive, преобразует их в Dataframe и затем обновляет уже существующую Oracle таблица на основе некоторых столбцов.

Когда размер кадра данных не очень велик, работа выполняется без проблем. Когда массив данных довольно большой, задание выполняется в течение нескольких часов, а затем останавливается с ошибкой Oracle:

exception caught: org.apache.spark.SparkException: Job aborted due to stage failure: Task 104 in stage 43.0 failed 4 times, most recent failure: Lost task 104.3 in stage 43.0 (TID 5937, lxpbda55.ra1.intra.groupama.fr, executor 227): java.sql.BatchUpdateException: ORA-00060: deadlock detected while waiting for resource

. Вот как работает мой код:

//This is where the error appears
modification(df_Delta_Modif, champs, conditions, cstProp)

//This is its definition
def modification(df: DataFrame, champs: List[String], conditions: List[String], cstProp: java.util.Properties) {
    val url = Parametre_mod.oracleUrl
    val options: JDBCOptions = new JDBCOptions(Map("url" -> url, "dbtable" -> Parametre_mod.targetTableBase, "user" -> Parametre_mod.oracleUser,
      "password" -> Parametre_mod.oraclePassword, "driver" -> "oracle.jdbc.driver.OracleDriver", "batchSize" -> "30000"))
    Crud_mod.modifierbatch(df, options, champs, conditions)
  }

//This is the definition of modifierbatch. It starts with establishing a connection to Oracle.
//Which surely works because I use the same thing on other scripts and it works fine
def modifierbatch(df: DataFrame,
              options : JDBCOptions,
               champs: List[String],
               conditions: List[String]) {
    val url = options.url
    val tables = options.table
    val dialect = JdbcDialects_mod.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      getJdbcType(field.dataType, dialect).jdbcNullType
    }
    val rddSchema = df.schema
    val getConnection: () => Connection = createConnectionFactory(options)
    val batchSize = options.batchSize
    val chainestmt = creerOdreSQLmodificationSimple(champs, conditions, tables) //definition below
    val listChamps: List[Int] = champs.map(rddSchema.fieldIndex):::conditions.map(rddSchema.fieldIndex)
    df.foreachPartition { iterator =>
      //savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
      executePartition(getConnection, tables, iterator, rddSchema, nullTypes, batchSize, chainestmt, listChamps, dialect, 0, "")
    }
  }

//This is the definition of creerOdreSQLmodificationSimple
def creerOdreSQLmodificationSimple(listChamps: List[String], listCondition: List[String], tablecible: String): String = {
    val champs = listChamps.map(_.toUpperCase).mkString(" = ?, ")
    val condition = listCondition.map(_.toUpperCase).mkString(" = ? and ")

    s"""UPDATE ${tablecible} SET ${champs} = ? WHERE ${condition} = ?"""
  }

Вы можете видеть, что директор не очень сложен. Я просто выполняю Oracle функцию (обновление), используя пакет. Я не знаю, что вызывает проблему тупика. Я не использовал перераспределение в Spark.

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация. Спасибо

1 Ответ

0 голосов
/ 08 января 2020

При использовании df.foreachPartition похоже, что доступ к базе данных осуществляется по нескольким параллельным соединениям.

Если это так, в каждом разделе должны быть условия, которые обновляют одни и те же строки.

Ваши варианты:

  1. Избавьтесь от перекрытия, чтобы никакие два обновления не обновляли одну и ту же строку (и).
  2. Если вы не можете этого сделать, расположите вещи так, чтобы все обновления, затрагивающие данную строку, гарантированно находятся в одном и том же «разделе».
  3. Если вы не можете этого сделать, отсортируйте значения условий перед их обработкой. Например, если ваши условия похожи на column1 = ? and column2 = ? и ваши значения установлены {{1, 'R'), (5, 'Q'), (1, 'B'), (2, 'Z')}, тогда отсортировать их (1, «B») -> (1, «R») -> (2, «Z») -> (5, «Q»). На самом деле не имеет значения, как вы сортируете их, пока порядок сортировки однозначен (без связей) и все разделы сортируют свои условия одинаково.
  4. Не используйте foreachPartition (т.е. не не пытайтесь работать параллельно). На самом деле это всего лишь вариант # 2 выше.

Сортировка работы согласно варианту 3 позволит избежать тупика, но вы потеряете большую часть преимущества параллельной работы (так как некоторые разделы будут блокировать другие).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...