У нас есть конвейер для чтения данных из OracleServer и записи их в Cloud Sql (mysql).Мы хотим получать любые новые данные из oracle db в режиме реального времени в mysql db.Но JdbcIo
только для ограниченных источников (это пакетный режим).
Затем мы использовали GenerateSequence()
, как предложено здесь , с JdbcIo.readAll()
.Это означает, что конвейер теперь является потоковым режимом.
Но проблема в том, что когда мы записываем его в Cloud Sql, используя JdbcIo.write()
, он создает слишком много соединений для одной записи.Около 300 соединений на таблицу.
Если я не использую GenerateSequence()
, который является пакетным режимом, конвейер использует только несколько соединений.
У нас есть около 20 таблиц, которые нужно интегрировать сиспользуя Dataflow с потоковым режимом, но подключение CloudSql (mysql) limit равно 4000, поэтому мы не можем этого сделать.
Есть ли какие-нибудь обходные пути, лучшие практики или хитрости здесь?
Спасибо.
Мой фрагмент трубопровода:
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(intervalSeconds)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(intervalSeconds))))
.apply(JdbcIO
.<Long, MyBean>readAll()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create("oracle.jdbc.driver.OracleDriver", oracleUrl)
.withUsername(oracleUser)
.withPassword(oraclePassword))
.withQuery(readQuery)
.withCoder(new MyBeanCoder())
.withRowMapper(new MyBeanRowMapper())
.withParameterSetter(new MyParameterSetter()))
.apply(JdbcIO
.<MyBean>write()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", mysqlUrl)
.withUsername(mysqlUser)
.withPassword(mysqlPassword)))
.withStatement(upsertQuery)
.withPreparedStatementSetter(new MyBeanPrepareStatement())
;