Я пытаюсь выполнить сбор данных об изменениях из базы данных oracle, используя поток данных весеннего облака с kafka в качестве брокера. Я использую механизм опроса для этого. Я опрашиваю базу данных с помощью основного запроса на выборку через регулярные промежутки времени для сбора любых обновленных данных. Для лучшей системы защиты от сбоев я сохранил время последнего опроса в oracle DB и использовал его для получения данных, которые обновляются после последнего опроса.
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
return jdbcPollingChannelAdapter;
}
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
flowBuilder.channel(this.source.output());
flowBuilder.transform(trans,"transform");
return flowBuilder.get();
}
Мои запросы в свойствах приложения следующие:
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)
update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
Это прекрасно работает для меня. При таком подходе я могу получить CDC из БД.
Проблема, которую я сейчас рассматриваю, ниже:
Создание таблицы только для поддержания времени опроса является чрезмерной нагрузкой. Я ищу ведение этого последнего опроса в теме кафки и извлекать это время из темы кафки, когда я делаю следующий опрос.
Я изменил метод jdbcMessageSource
, как показано ниже:
public MessageSource<Object> jdbcMessageSource() {
String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, query);
return jdbcPollingChannelAdapter;
}
Но поток данных Spring создает экземпляр компонента pollingFlow () (см. Код выше) только один раз. Следовательно, любой запрос, который выполняется первым, останется прежним. Я хочу обновить запрос новым временем опроса для каждого опроса.
Есть ли способ написать пользовательский Integrationflow
, чтобы этот запрос обновлялся каждый раз, когда я делаю опрос?
Я попробовал IntegrationFlowContext
для этого, но не удалось.
Заранее спасибо !!!