Я использую общее соединение между всеми ядрами одного исполнителя Spark.По сути, я создал одноэлементный объект соединения, чтобы разделить его между ядрами одного исполнителя, чтобы он был распределен между ядрами, и для каждого исполнителя будет только 1 соединение.
object SingletonConnection {
private var connection: Connection = null
def getConnection(url: String, username: String, password: String): Connection = synchronized {
if (connection == null) {
connection = DriverManager.getConnection(url, username, password)
}
connection
}
}
Код исполнителя Spark:
dataFrame.foreachPartition { batch =>
if (batch.nonEmpty) {
lazy val dbConnection = SingletonConnection
val dbc = dbConnection.getConnection(url, user, password)
// do some operatoins
st.addBatch()
}
st.executeBatch()
}
}
catch {
case exec: BatchUpdateException =>
var ex: SQLException = exec
while (ex != null) {
ex.printStackTrace()
ex = ex.getNextException
}
throw exec
}
}
}
Проблема в том, что я не могу закрыть соединение.Поскольку я не буду знать, когда конкретное ядро завершит свое выполнение.Если я окончательно закрываю соединение, как только одно ядро завершает свою задачу, оно закрывает соединение, и это приводит к остановке всех других ядер, так как общее соединение закрыто.
Поскольку я не закрываю соединение здесь, соединениеостается открытым даже после завершения задачи.Как я могу заставить этот процесс работать так, чтобы я мог закрыть соединение ТОЛЬКО ПОСЛЕ ВСЕХ ЯДЕР, ЗАКОНЧИВШИХСЯ ИХ ЗАДАЧИ.