Эта функция не встроена в Apache Beam, однако есть несколько вариантов. JdbcIO использует стандартный интерфейс Java JDB C для подключения к вашей базе данных. Не составит труда перегрузить драйвер Mysql JDB C вашей собственной оболочкой, которая устанавливает туннель S SH перед подключением. Я быстро выполнил поиск в Google и нашел проект, который оборачивает произвольный драйвер JDB C в туннель S SH, используя SSHJ: jdb c -sshj (копия публикуется в maven как com .cekrli c: JDB c -sshj: 0.1.0). Проект выглядит несколько необработанным, но он будет делать то, что вы хотите. Добавьте это к зависимостям времени выполнения, а затем обновите конфигурацию до чего-то подобного (этот пример небезопасен):
pipeline.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.cekrlic.jdbc.ssh.tunnel.SshJDriver",
"jdbc:sshj://sshbastion?remote=database:3306&username=sshuser&password=sshpassword&verify_hosts=off;;;jdbc:mysql://localhost:3306/mydb")
.username("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
Если вы используете Dataflow, вы можете настроить виртуальную машину GCE для работы в качестве шлюза. На этой виртуальной машине используйте переадресацию S SH для туннелирования базы данных на внешний интерфейс виртуальной машины (ssh -R \*:3306:database:3306 sshbastion
), сделайте порт доступным в VP C, а затем запустите задание потока данных на своем VP C. . Если ваша база данных уже запущена в GCP, вы можете использовать этот подход, чтобы запустить задание потока данных на том же VP C, что и база данных, и отбросить шаг S SH.