Как избавиться от соединений с базой данных, когда Flink Retarts - PullRequest
1 голос
/ 12 июня 2019

Я использую dbcp2.BasicDataSource как пул соединений с базой данных. Запрос к базе данных используется в некоторой функции карты для получения дополнительной информации о датчиках; Я обнаружил, что при перезапуске задания перезапуска из-за исключений старые соединения с БД все еще активны на стороне сервера.

Flink версия 1.7

Код конструкции BasicDataSource здесь

object DbHelper extends Lazing with Logging {
    private lazy val connectionPool: BasicDataSource = createDataSource()

    private def createDataSource(): BasicDataSource = {
        val conn_str = props.getProperty("db.url")
        val conn_user = props.getProperty("db.user")
        val conn_pwd = props.getProperty("db.pwd")
        val initialSize = props.getProperty("db.initial.size", "3").toInt

        val bds = new BasicDataSource
        bds.setDriverClassName("org.postgresql.Driver")
        bds.setUrl(conn_str)
        bds.setUsername(conn_user)
        bds.setPassword(conn_pwd)
        bds.setInitialSize(initialSize)
        bds
    }
}

1 Ответ

0 голосов
/ 13 июня 2019

Измените вашу функцию карты на RichMapFunction. Переопределите close() метод RichMapFunction и поместите код, чтобы закрыть соединение с базой данных. Вероятно, вы должны поместить код для открытия соединения и в метод open().

...