Как ограничить границы транзакций для JdbcPollingChannelAdapter - PullRequest
0 голосов
/ 19 декабря 2018

У меня есть JdbcPollingChannelAdapter, определенный следующим образом:

@Bean
public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
            "SELECT * FROM common_task where due_at <= NOW() and retries < order by due_at ASC FOR UPDATE SKIP LOCKED");
    jdbcPollingChannelAdapter.setMaxRowsPerPoll(1);
    jdbcPollingChannelAdapter.setUpdateSql("Update common_task set retries = :retries, due_at = due_at + interval '10 minutes' WHERE ID = (:id)");
    jdbcPollingChannelAdapter.setUpdatePerRow(true);
    jdbcPollingChannelAdapter.setRowMapper(this::mapRow);
    jdbcPollingChannelAdapter.setUpdateSqlParameterSourceFactory(this::updateParamSource);
    return jdbcPollingChannelAdapter;
}

Поток интеграции для этого:

@Bean
public IntegrationFlow pollingFlow(MessageSource<Object> jdbcMessageSource) {
    return IntegrationFlows.from(jdbcMessageSource,
            c -> c.poller(Pollers.fixedRate(250, TimeUnit.MILLISECONDS)
                    .maxMessagesPerPoll(1)
                    .transactional()))
            .split()
            .channel(taskSourceChannel())
            .get();
}

Активатор службы определен как

@ServiceActivator(inputChannel = "taskSourceChannel")
    public void doSomething(FooTask event) {
        //do something but ** not ** within the transaction of the poller.
    }      

Опрос в потоке интеграции определяется как транзакционный.Насколько я понимаю, это будет 1. Выполнить запрос выбора и обновить запрос в транзакции.2. Он также выполнит метод doSomething () в той же транзакции.

Цель: я хотел бы сделать 1, а не 2. Я хотел бы сделать выбор и обновление в транзакции, чтобы убедиться, что произойдет и то, и другое.Но я не хочу выполнять doSomething () в той же транзакции.В случае исключения в doSomething (), я все еще хочу сохранить обновления, сделанные во время опроса.Как я могу достичь этого?

1 Ответ

0 голосов
/ 19 декабря 2018

Это делается с помощью простого смещения потока.Итак, вам нужно просто выйти из потока опроса, разрешить ему передать TX и продолжить процесс в отдельном потоке.

Согласно вашей логике с .split(), еще лучше иметь обработку нового потока уже послеразделение, поэтому элементы будут даже обрабатываться этим doSomething() параллельно.

Цель просто может быть достигнута с помощью ExecutorChannel.Поскольку у вас уже есть taskSourceChannel(), просто замените его на ExecutorChannel на основе некоторого управляемого ThreadPoolTaskExecutor.

См. Дополнительную информацию в Справочном руководстве: https://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-executorchannel

и его Javadocs.

Простой вариант конфигурации Java выглядит следующим образом:

    @Bean
    public MessageChannel taskSourceChannel() {
        return new ExecutorChannel(executor());
    }

    @Bean
    public Executor executor() {
        return new ThreadPoolTaskExecutor();
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...