Я использую Apache Beam и облачный поток данных Google для вставки информации в облачную базу данных SQL.До сих пор это работало отлично, записывая в одну таблицу.Отправляемая информация расширяется, включая информацию, предназначенную для другой таблицы в базе данных.
Мне было бы любопытно, если бы был способ динамически использовать запрос SQL на основе информации, которую я получаю, или я могу каким-то образом создать конвейер для выполнения нескольких запросов?Либо будет работать ...
Или я застрял с необходимостью создать отдельный конвейер?
Приветствия,
РЕДАКТИРОВАТЬ: Добавление моей текущей конфигурации конвейера
MainPipeline = Pipeline.create(options);
MainPipeline.apply(PubsubIO.readStrings().fromSubscription(MAIN_SUBSCRIPTION))
.apply(JdbcIO.<String> write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.cj.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PASS))
.withStatement(QUERY_SQL).withPreparedStatementSetter(new NewPreparedStatementSetter() {
}));