Я хочу использовать пул соединений в потоковой структурированной искре, но я не хочу использовать writeStream для создания нового пула соединений, как мне обновить?
Теперь я хочу установить переменные в приложении.conf, объект загружается до получения местоположения application.conf в основном методе, например:
class JdbcSink extends ForeachWriter[Row] with Serializable with Settings {
self: SinkStatement =>
import JdbcSink.dsPool
var connection: Connection = _
//the sql statement
var statement: Statement = _
//open
def open(partitionId: Long, version: Long): Boolean = {
connection = dsPool.getConnection()
statement = connection.createStatement()
true
}
//execute
def process(value: Row): Unit = {
//execute
statement.executeUpdate(this.make(value))
}
//close
def close(errorOrNull: Throwable): Unit = {
//close the connection
dsPool.evictConnection(connection)
}
}
object JdbcSink extends Settings {
val config = new HikariConfig
config.setDriverClassName(this.sinkDbDriver)
config.setJdbcUrl(this.sinkDbUrl)
config.setUsername(this.sinkDbUser)
config.setPassword(this.sinkDbPwd)
config.addDataSourceProperty("cachePrepStmts", "true")
config.addDataSourceProperty("prepStmtCacheSize", "250")
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
val dsPool = new HikariDataSource(this.HConfig)
}
Как получить гибкие настройки? Например, «с настройками» get «не может ininitiallize jdbcSink»исключение
val config = new HikariConfig
config.setDriverClassName (this.sinkDbDriver)
config.setJdbcUrl (this.sinkDbUrl)
config.setUsername (this.sinkDbUser)
config.setPassword (this.sinkDbPwd)
config.addDataSourceProperty ("cachePrepStmts", "true")
config.addDataSourceProperty ("prepStmtCacheSize", "250")
config.addDataSourceProperty ("prepStmtCacheSqlLimit", "2048")
val dsPool = new HikariDataSource (this.HConfig)