Spring Integration: запустите новую транзакцию в моем потоке сообщений IntegrationFlowBuilder, чтобы зафиксировать изменение и возобновить внешнюю транзакцию - PullRequest
0 голосов
/ 20 сентября 2019

Мой jdbcSourceMessage выполняет выбор для обновления с пакетом из 100 строк одновременно.В то время как IntegrationFlow был выполнен в Транзакции для удержания блокировки базы данных для извлеченного пакета.Я хотел бы начать новую транзакцию для моего JdbcSourceUpdate (в потоке сообщений), чтобы выполнить обновление и зафиксировать мои изменения для каждой строки, отправленной через канал.

@Bean
public IntegrationFlow integrationFlow() {
    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcSourceMessage());
    flowBuilder
            .split()
            .log(LoggingHandler.Level.INFO, message ->
                    message.getHeaders().get("sequenceNumber")
                            + " événements publiés sur le bus de message sur "
                            + message.getHeaders().get("sequenceSize")
                            + " événements lus (lot)")
            .transform(Transformers.toJson())
            .log()
            .enrichHeaders(h -> h.headerExpression("type", "payload.typ_evenement"))
            .publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
            .subscribe(flow -> flow
                    .bridge()
                    .transform(Transformers.toJson())
                    .transform(kafkaGuyTransformer())
                    .channel(this.rabbitMQchannel.demandeInscriptionOutput())) 
            .subscribe(flow -> flow
                    .handle(jdbcMessageHandler())) 
    );
    return flowBuilder.get();
}


@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PeriodicTrigger trigger = new PeriodicTrigger(this.proprietesSourceJdbc.getTriggerDelay(), TimeUnit.SECONDS);
    PollerMetadata pollerMetadata = Pollers.trigger(trigger)
            .advice(transactionInterceptor())
            .get();
    pollerMetadata.setMaxMessagesPerPoll(proprietesSourceJdbc.getMaxRowsPerPoll());
    return pollerMetadata;
}

@Bean
public JdbcSourceUpdate jdbcSourceUpdate() {
    return new JdbcSourceUpdate();
}

public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
            .transactionManager(transactionManager())
            .build();
}

public PlatformTransactionManager transactionManager(){
    DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(sourceDeDonnees);
    transactionManager.setRollbackOnCommitFailure(false);
    return transactionManager;
}


public class KafkaGuyTransformer implements GenericTransformer<Message, Message> {

    @Override
    public Message transform(Message message) {
        Message<String> msg = null;
        try {
            DemandeRecueDTO dto = objectMapper.readValue(message.getPayload().toString(), DemandeRecueDTO.class);
            msg = MessageBuilder.withPayload(dto.getTxtDonnee())
                    .copyHeaders(message.getHeaders())
                    .build();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        return msg;
    }
}

public class JdbcSourceUpdate implements MessageHandler {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        try {
            Thread.sleep(100);
            DemandeRecueDTO dto = objectMapper.readValue(message.getPayload().toString(), DemandeRecueDTO.class);
            jdbcTemplate.update(proprietesSourceJdbc.getUpdate(), dto.getIdEvenementDemandeCrcd());
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }
}

1 Ответ

1 голос
/ 20 сентября 2019

Поскольку у вас есть реализация JdbcSourceUpdate, достаточно просто сделать следующее:

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void handleMessage(Message<?> message) throws MessagingException {

Для получения дополнительной информации см. JavaDocs:

/**
 * Create a new transaction, and suspend the current transaction if one exists.
 * Analogous to the EJB transaction attribute of the same name.
 * <p><b>NOTE:</b> Actual transaction suspension will not work out-of-the-box
 * on all transaction managers. This in particular applies to
 * {@link org.springframework.transaction.jta.JtaTransactionManager},
 * which requires the {@code javax.transaction.TransactionManager} to be
 * made available to it (which is server-specific in standard Java EE).
 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager
 */
REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW),

ОБНОВЛЕНИЕ

Обратите внимание на ПРИМЕЧАНИЕ, хотя:

 * Actual transaction suspension will not work out-of-the-box
 * on all transaction managers. This in particular applies to
 * {@link org.springframework.transaction.jta.JtaTransactionManager}`. 

Итак, звучит как DataSourceTransactionManager не работает с подвеской.Я могу предложить вам использовать .gateway() для этого JdbcSourceUpdate, но с использованием ExecutorChannel.Таким образом, ваш handle(jdbcSourceUpdate() будет выполнен в новом потоке и, следовательно, с новой транзакцией.Основной поток будет ожидать ответа от того шлюза, в котором была открыта его транзакция.

Примерно так:

                        .subscribe(f -> f
                                .gateway(subFlow ->
                                        subFlow.channel(c -> c.executor())
                                                .handle(jdbcMessageHandler()))
                                .channel("nullChannel")
                        ));

Купите ваш JdbcSourceUpdate для возврата ответа шлюза.Предположим, что вы не реализовали MessageHandler там, а сделали просто как обычный POJO с одним не void методом.

...