Apache beam предоставляет разъем JDBCIO для подключения к CloudSql postgreSQL.Моя работа читает событие из pub / sub.Тело события выглядит следующим образом:
tableName,
list<value>
Мне нужно записать в таблицу на основе имени таблицы, которое я получаю из моего сообщения.
JDBCIO подготовил оператор, который позволитмне параметризировать значения в моем запросе вставки.Но мне нужно генерировать запрос вставки динамически на основе информации, представленной в событии.
pipeline
.apply(PubsubIO.readStrings().fromSubscription())
.apply(convertToKV())
.apply(JdbcIO.<List<String>>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query)
throws SQLException {
i=0
for each element in list
query.setInt(i, element.get(i);
i++;
}
})
);
Я должен иметь возможность динамически создавать оператор SQL на основе события ввода из коллекции pcollection.Мой оператор выбора должен генерироваться динамически на основе значения списка и имени таблицы.Пожалуйста, дайте мне знать, можем ли мы сделать это или нет.
Обновление: -
Я пытаюсь вручную вызвать драйвер jdbc внутри функции parDo, но получаю сообщение об ошибке ниже.Не найден подходящий драйвер для jdbcURL.
Пожалуйста, дайте мне знать, если я что-то пропустил:
@Setup
public void doAnyRequiredSetup() throws SQLException
{
LoggingContextUtil.installContext(loggingContext);
connection=DriverManager.getConnection(JdbcUrl,user,password);
statement=connection.createStatement();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("In doAnyRequiredSetup logging Context is now set and JDBC connection is .");
}
}
@SuppressWarnings("unchecked")
@ProcessElement
public void processElement(ProcessContext context)
{
JsonNode element=context.element();
try {
String query=formatQuery(baseQuery);
boolean result=statement.execute(query);
if(LOGGER.isDebugEnabled()) {
LOGGER.debug("Executed query : "+query+" and the result is "+ result);
}
} catch (IllegalArgumentException | SQLException e) {
ErrorMessage em = new ErrorMessage(element.toString(), "Insert Query Failed", e.getMessage());
context.output(ValidateTagHelper.FAILURE_TAG,em);
}
}