Spring Integration Flow с Jdbc источником сообщений с динамическим запросом - PullRequest
1 голос
/ 07 марта 2019

Я пытаюсь выполнить сбор данных об изменениях из базы данных oracle, используя поток данных весеннего облака с kafka в качестве брокера. Я использую механизм опроса для этого. Я опрашиваю базу данных с помощью основного запроса на выборку через регулярные промежутки времени для сбора любых обновленных данных. Для лучшей системы защиты от сбоев я сохранил время последнего опроса в oracle DB и использовал его для получения данных, которые обновляются после последнего опроса.

public MessageSource<Object> jdbcMessageSource() {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
    jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
    return jdbcPollingChannelAdapter;
}

@Bean
public IntegrationFlow pollingFlow() {
    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
    flowBuilder.channel(this.source.output());
    flowBuilder.transform(trans,"transform");
    return flowBuilder.get();

}

Мои запросы в свойствах приложения следующие:

query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)

update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP

Это прекрасно работает для меня. При таком подходе я могу получить CDC из БД.

Проблема, которую я сейчас рассматриваю, ниже:

Создание таблицы только для поддержания времени опроса является чрезмерной нагрузкой. Я ищу ведение этого последнего опроса в теме кафки и извлекать это время из темы кафки, когда я делаю следующий опрос.

Я изменил метод jdbcMessageSource, как показано ниже:

public MessageSource<Object> jdbcMessageSource() {
    String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";

    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, query);
    return jdbcPollingChannelAdapter;
}

Но поток данных Spring создает экземпляр компонента pollingFlow () (см. Код выше) только один раз. Следовательно, любой запрос, который выполняется первым, останется прежним. Я хочу обновить запрос новым временем опроса для каждого опроса.

Есть ли способ написать пользовательский Integrationflow, чтобы этот запрос обновлялся каждый раз, когда я делаю опрос?

Я попробовал IntegrationFlowContext для этого, но не удалось.

Заранее спасибо !!!

Ответы [ 3 ]

1 голос
/ 07 марта 2019

У нас есть эта тестовая конфигурация (извините, это XML):

<inbound-channel-adapter query="select * from item where status=:status" channel="target"
                             data-source="dataSource" select-sql-parameter-source="parameterSource"
                             update="delete from item"/>


    <beans:bean id="parameterSource" factory-bean="parameterSourceFactory"
                factory-method="createParameterSourceNoCache">
        <beans:constructor-arg value=""/>
    </beans:bean>

    <beans:bean id="parameterSourceFactory"
                class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
        <beans:property name="parameterExpressions">
            <beans:map>
                <beans:entry key="status" value="@statusBean.which()"/>
            </beans:map>
        </beans:property>
        <beans:property name="sqlParameterTypes">
            <beans:map>
                <beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/>
            </beans:map>
        </beans:property>
    </beans:bean>

    <beans:bean id="statusBean"
                class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>

Обратите внимание на ExpressionEvaluatingSqlParameterSourceFactory и его createParameterSourceNoCache() фабрику. Этот результат может быть использован для select-sql-parameter-source.

У JdbcPollingChannelAdapter есть setSelectSqlParameterSource по этому вопросу.

Итак, вы настраиваете ExpressionEvaluatingSqlParameterSourceFactory, чтобы иметь возможность разрешать некоторый параметр запроса как выражение для вызова некоторого метода боба, чтобы получить желаемое значение от Kafka. Тогда createParameterSourceNoCache() поможет вам получить ожидаемый SqlParameterSource.

В документах также есть некоторая информация: https://docs.spring.io/spring-integration/docs/current/reference/html/#jdbc-inbound-channel-adapter

1 голос
/ 08 апреля 2019

С помощью обоих приведенных выше ответов мне удалось выяснить подход.Напишите jdbc template и оберните его как бин и используйте его для Integration Flow.

@EnableBinding(Source.class)
@AllArgsConstructor
public class StockSource {

  private DataSource dataSource;

  @Autowired
  private JdbcTemplate jdbcTemplate;

  private MessageChannelFactory messageChannelFactory;  // You can use normal message channel which is available in spring cloud data flow as well.

  private List<String> findAll() {
    jdbcTemplate = new JdbcTemplate(dataSource);
    String time = "10/24/60" . (this means 10 seconds for oracle DB)
    String query = << your query here like.. select * from test where (last_updated_time > time) >>;
    return jdbcTemplate.query(query, new RowMapper<String>() {
      @Override
      public String mapRow(ResultSet rs, int rowNum) throws SQLException {
          ...
          ...
          any row mapper operations that you want to do with you result after the poll.
          ...
          ...
          ...
        // Change the time here for the next poll to the DB. 
        return result;
      }
    });
  }

  @Bean
  public IntegrationFlow supplyPollingFlow() {

    IntegrationFlowBuilder flowBuilder = IntegrationFlows
        .from(this::findAll, spec -> {
          spec.poller(Pollers.fixedDelay(5000));
        });
    flowBuilder.channel(<<Your message channel>>);
    return flowBuilder.get();
  }

}

В нашем случае использования мы сохраняли время последнего опроса в теме кафки.Это должно было сделать состояние приложения меньше.Каждый новый опрос к БД теперь будет иметь новое время в состоянии where.

PS: ваш брокер обмена сообщениями (kafka / rabbit mq) должен работать в вашей локальной сети или подключаться к ним, если естьразмещен на другой платформе.

God Speed ​​!!!

1 голос
/ 07 марта 2019

См. Ответ Артема о механизме динамического запроса в стандартном адаптере; альтернативой, однако, было бы просто обернуть JdbcTemplate в Боб и вызвать его с

IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
    ...

или даже простая лямбда

    .from(() -> jdbcTemplate...)
...