как переопределить ForeachWriter с пулом соединений в искровой структурированной потоковой передаче - PullRequest
1 голос
/ 16 апреля 2019

Я хочу использовать пул соединений в потоковой структурированной искре, но я не хочу использовать writeStream для создания нового пула соединений, как мне обновить?

Теперь я хочу установить переменные в приложении.conf, объект загружается до получения местоположения application.conf в основном методе, например:

class JdbcSink extends ForeachWriter[Row] with Serializable with Settings {
  self: SinkStatement =>

  import JdbcSink.dsPool

  var connection: Connection = _
  //the sql statement
  var statement: Statement = _

  //open
  def open(partitionId: Long, version: Long): Boolean = {
    connection = dsPool.getConnection()
    statement = connection.createStatement()
    true
  }

  //execute
  def process(value: Row): Unit = {
    //execute
    statement.executeUpdate(this.make(value))
  }


  //close
  def close(errorOrNull: Throwable): Unit = {
    //close the connection
    dsPool.evictConnection(connection)
  }

}
object JdbcSink extends Settings {


  val config = new HikariConfig
  config.setDriverClassName(this.sinkDbDriver)
  config.setJdbcUrl(this.sinkDbUrl)
  config.setUsername(this.sinkDbUser)
  config.setPassword(this.sinkDbPwd)
  config.addDataSourceProperty("cachePrepStmts", "true")
  config.addDataSourceProperty("prepStmtCacheSize", "250")
  config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
  val dsPool = new HikariDataSource(this.HConfig)
}

Как получить гибкие настройки? Например, «с настройками» get «не может ininitiallize jdbcSink»исключение

      val config = new HikariConfig
      config.setDriverClassName (this.sinkDbDriver)
      config.setJdbcUrl (this.sinkDbUrl)
      config.setUsername (this.sinkDbUser)
      config.setPassword (this.sinkDbPwd)
      config.addDataSourceProperty ("cachePrepStmts", "true")
      config.addDataSourceProperty ("prepStmtCacheSize", "250")
      config.addDataSourceProperty ("prepStmtCacheSqlLimit", "2048")
      val dsPool = new HikariDataSource (this.HConfig)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...