Потоковый конвейер для JdbcIo.write () создает около 300 соединений для Cloud Sql (MySql) на таблицу - PullRequest
0 голосов
/ 26 сентября 2018

У нас есть конвейер для чтения данных из OracleServer и записи их в Cloud Sql (mysql).Мы хотим получать любые новые данные из oracle db в режиме реального времени в mysql db.Но JdbcIo только для ограниченных источников (это пакетный режим).

Затем мы использовали GenerateSequence(), как предложено здесь , с JdbcIo.readAll().Это означает, что конвейер теперь является потоковым режимом.

Но проблема в том, что когда мы записываем его в Cloud Sql, используя JdbcIo.write(), он создает слишком много соединений для одной записи.Около 300 соединений на таблицу.

Если я не использую GenerateSequence(), который является пакетным режимом, конвейер использует только несколько соединений.

У нас есть около 20 таблиц, которые нужно интегрировать сиспользуя Dataflow с потоковым режимом, но подключение CloudSql (mysql) limit равно 4000, поэтому мы не можем этого сделать.

Есть ли какие-нибудь обходные пути, лучшие практики или хитрости здесь?

Спасибо.

Мой фрагмент трубопровода:

pipeline
        .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(intervalSeconds)))

        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(intervalSeconds))))

        .apply(JdbcIO
                .<Long, MyBean>readAll()
                .withDataSourceConfiguration(
                        JdbcIO.DataSourceConfiguration.create("oracle.jdbc.driver.OracleDriver", oracleUrl)
                        .withUsername(oracleUser)
                        .withPassword(oraclePassword))
                .withQuery(readQuery)
                .withCoder(new MyBeanCoder())
                .withRowMapper(new MyBeanRowMapper())
                .withParameterSetter(new MyParameterSetter()))

        .apply(JdbcIO
                .<MyBean>write()
                .withDataSourceConfiguration(
                        JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", mysqlUrl)
                        .withUsername(mysqlUser)
                        .withPassword(mysqlPassword)))
                .withStatement(upsertQuery)
                .withPreparedStatementSetter(new MyBeanPrepareStatement())
        ;
...