Как закрыть общее одноэлементное соединение между ядрами в Spark Executor - PullRequest
0 голосов
/ 11 июня 2018

Я использую общее соединение между всеми ядрами одного исполнителя Spark.По сути, я создал одноэлементный объект соединения, чтобы разделить его между ядрами одного исполнителя, чтобы он был распределен между ядрами, и для каждого исполнителя будет только 1 соединение.

object SingletonConnection {

private var connection: Connection = null

def getConnection(url: String, username: String, password: String): Connection = synchronized {
if (connection == null) {
  connection = DriverManager.getConnection(url, username, password)
}
connection
}
}

Код исполнителя Spark:

dataFrame.foreachPartition { batch =>
  if (batch.nonEmpty) {
    lazy val dbConnection = SingletonConnection
    val dbc = dbConnection.getConnection(url, user, password)

    // do some operatoins


          st.addBatch()
        }
        st.executeBatch()
      }
    }
    catch {
      case exec: BatchUpdateException =>
        var ex: SQLException = exec
        while (ex != null) {
          ex.printStackTrace()
          ex = ex.getNextException
        }
        throw exec
    }

  }
}

Проблема в том, что я не могу закрыть соединение.Поскольку я не буду знать, когда конкретное ядро ​​завершит свое выполнение.Если я окончательно закрываю соединение, как только одно ядро ​​завершает свою задачу, оно закрывает соединение, и это приводит к остановке всех других ядер, так как общее соединение закрыто.

Поскольку я не закрываю соединение здесь, соединениеостается открытым даже после завершения задачи.Как я могу заставить этот процесс работать так, чтобы я мог закрыть соединение ТОЛЬКО ПОСЛЕ ВСЕХ ЯДЕР, ЗАКОНЧИВШИХСЯ ИХ ЗАДАЧИ.

1 Ответ

0 голосов
/ 30 октября 2018

Я реализовал это с помощью Java, поэтому я могу дать вам некоторую подсказку.

В классе SingletonConnection я создал потокобезопасный аккумулятор.Каждый раз, когда соединение открывается, аккумулятор увеличивается на единицу.И каждый раз перед закрытием соединения аккумулятор уменьшается на единицу и проверяется, равен ли аккумулятор нулю.Когда аккумулятор равен нулю, вы можете закрыть соединение.

Это не закроет соединение, когда другие запущенные потоки все еще используют соединение.Но это позволит вам создать больше соединений, чем вы думали (количество разделов).

...