Менеджер транзакций не откатывается на задание Spring Batch - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть проблема, когда мне нужно прочитать «необработанные» данные из базы данных SQL Server, обработать их, затем выборочно обновить две-шесть таблиц в базе данных DB2, а затем пометить эти данные как обработанные в исходной базе данных на SQLСервер.В любой момент, если что-то не получится, я хочу откатить все обновления.Если у меня есть 10 необработанных предметов и 9 хороших, но один не работает, я все же хочу, чтобы 9 хороших завершили и десятый вернулись в исходное состояние, пока мы не сможем исследовать проблему и внести исправления.

Общая архитектура такова, что один входной экземпляр может привести к вставке как минимум в 3 таблицы DB2 и до 7 таблиц.Несколько таблиц DB2 могут заканчиваться несколькими вставками из одного ввода.Для каждого обновления таблицы мне нужно было бы разработать отдельного писателя и выяснить, как передать каждому писателю конкретные данные, необходимые для этой таблицы.Мне также нужно использовать 2 источника данных для обновлений DB2 и SQL Server соответственно.

Я не опытный разработчик Spring Batch.И у меня редко бывает проект, где я могу «прочитать 1, процесс 1, написать 1» и повторить.Обычно мне нужно прочитать несколько файлов / баз данных, обработать эти данные, а затем записать в один или несколько отчетов, файлов и / или баз данных.Я вижу, где предоставляется поддержка для такого рода приложений, но это более сложный процесс, требующий дополнительных исследований и ограниченных примеров.

Пытаясь найти решение, я выбрал легкий путь.Я разработал класс, который реализует Tasklet и написал код так, как работает мой процесс в реальном времени.Он извлекает входные данные из SQL с использованием JDBCTemplate, затем передает данные в код, который обрабатывает данные и определяет, что необходимо обновить.У меня есть класс диспетчера транзакций, который реализует @Transactional с REQUIRES_NEW и откатом для моего пользовательского непроверенного исключения.Класс Transactional перехватывает все события DataAccessException и генерирует пользовательское исключение.В настоящее время я использую только источник данных DB2, чтобы не усложнять ситуацию.

В своем тестировании я добавил код в конце процесса обновления, который выдает непроверенное исключение.Я ожидал, что обновления будут отменены.Но этого не произошло.При повторном запуске процесса я получаю 803 ошибки в DB2.

И последнее.В нашем магазине мы должны использовать хранимые процедуры в DB2 для любого доступа.Поэтому я использую SimpleJdbcCall для выполнения SP.

Вот мой код:

Основной Java-класс для тасклета:

public class SynchronizeDB2WithSQL   implements Tasklet
{

private static final BatchLogger logger = BatchLogger.getLogger();    

private Db2UpdateTranManager tranMgr;
public void setTranMgr(Db2UpdateTranManager tranMgr) {
    this.tranMgr = tranMgr;
}

private AccessPaymentIntegrationDAO pmtIntDAO;
public void setPmtIntDAO(AccessPaymentIntegrationDAO pmtIntDAO) {
    this.pmtIntDAO = pmtIntDAO;
}

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
    logger.logInfoMessage("=============================================");
    logger.logInfoMessage("   EB0255IA - Synchronize DB2 with SQL");
    logger.logInfoMessage("=============================================");

    List<UnprocessedPaymentDataBean> orderList = this.pmtIntDAO.fetchUnprocessedEntries();

    if(CollectionUtils.isNotEmpty(orderList)) {
        for(UnprocessedPaymentDataBean ent: orderList) {
            logger.logDebugMessage("  Processing payment ");
            logger.logDebugMessage(ent.toString());
            Map<String, List<PaymentTransactionDetailsBean>> paymentDetails = arrangePayments(this.pmtIntDAO.getDetailsByOrder(ent.getOrderNbr()));
            try {
                this.tranMgr.createNewAuthorizedPayment(ent, paymentDetails);
            } catch (DataException e) {
                logger.logErrorMessage("Encountered a Data Exception: "+e);
            }
        }
    } else {
        logger.logInfoMessage("=============================================");
        logger.logInfoMessage("No data was encountered that needed to be processed");
        logger.logInfoMessage("=============================================");
    }

    return RepeatStatus.FINISHED;
}

и Spring Batch xml:

<job id="EB0255IA" parent="baseJob" job-repository="jobRepository"
    xmlns="http://www.springframework.org/schema/batch" restartable="true"
    incrementer="parameterIncrementer">
    <description>Job to maintain the DB2 updates for payment activity</description>         
    <step id="SynchronizeDB2WithSQL">
        <tasklet ref="synchronizeTasklet" />
    </step> 
</job>

<bean id="synchronizeTasklet" class="com.ins.pmtint.synchdb2.SynchronizeDB2WithSQL" >
    <property name="pmtIntDAO" ref="pmtIntDAO" />
    <property name="tranMgr" ref="db2TranMgr" />    
</bean>

<bean id="jdbcUpdateDB2" class="com.ins.pmtint.db.JDBCUpdateDB2">
    <property name="dataSource" ref="dataSourceBnkDB2" />
</bean>

<bean id="updateDB2DataDAO" class="com.ins.pmtint.db.dao.UpdateDB2DataDAOImpl">
    <property name="jdbcUpdateDB2" ref="jdbcUpdateDB2" />
</bean>

<bean id="db2TranMgr" class="com.ins.pmtint.db.tranmgr.Db2UpdateTranManagerImpl">
    <property name="updateDB2DataDAO" ref="updateDB2DataDAO" />
</bean>

<bean id="jdbcPaymentIntegration" class="com.ins.pmtint.db.JDBCPaymentIntegration" >
    <property name="dataSource" ref="dataSourcePmtIntegration" />
</bean>

<bean id="pmtIntDAO" class="com.ins.pmtint.db.dao.AccessPaymentIntegrationDAOImpl">
    <property name="jdbcPaymentIntegration" ref="jdbcPaymentIntegration" />
</bean>

Часть реализации менеджера транзакций.

public class Db2UpdateTranManagerImpl implements Db2UpdateTranManager, DB2FieldNames {

private static final BatchLogger logger = BatchLogger.getLogger();

UpdateDB2DataDAO updateDB2DataDAO;
public void setUpdateDB2DataDAO(UpdateDB2DataDAO updateDB2DataDAO) {
    this.updateDB2DataDAO = updateDB2DataDAO;
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public void createNewAuthorizedPayment(UnprocessedPaymentDataBean dataBean, Map<String, List<PaymentTransactionDetailsBean>> paymentDetails) {
    logger.logDebugMessage("At Db2UpdateTranManagerImpl.createNewAuthorizedPayment(");
    logger.logDebugMessage(dataBean.toString());
    String orderNbr = String.valueOf(dataBean.getOrderNbr());
    String eventCode = TranTypeCode.fromValue(dataBean.getTransactionTypeCode()).getDB2Event();
    if(eventCode == null) {
        try {
            KFBDistBatchEMail.createAndSendMessage("There is no event code for current entry\n\nOrder: "+orderNbr+"  Tran type: "+dataBean.getTransactionTypeCode(), "EB0255IA - Database error" ,EnhancedPropertyPlaceholderConfigurer.getEmailFrom(), EnhancedPropertyPlaceholderConfigurer.getEmailTo(), null);
            throw new DataException("Update failed:  No event code to apply");
        } catch (EMailExcpetion e2) {
            logger.logErrorMessage("Generating email", e2);
        }
    }
    String orginatingSystemId;
    if (dataBean.getPaymentTypeCode().equalsIgnoreCase("EFT"))
            orginatingSystemId = "FS";
        else
            orginatingSystemId = "IN";

    try {
        if(dataBean.getTransactionTypeCode().equalsIgnoreCase("A")) {
            this.updateDB2DataDAO.updatePaymentDetails(orderNbr, DB_INITIAL_EVENT_CODE, "", dataBean.getTransactionAmt(), orginatingSystemId);
        } 

**** FOR TESTING - AT THE END I HAVE ADDED ****
    throw new DataException("I finished processing and backed out. \n\n"+dataBean);
}

И это часть кода JDBC:

public class JDBCUpdateDB2 extends JdbcDaoSupport 
                        implements DB2FieldNames
{
private static final BatchLogger logger = KFBBatchLogger.getLogger();

public void updatePaymentDetails(String orderNbr, String eventCd, String authnbr, Double amount, String orginatingSystemId) {


    SimpleJdbcCall jdbcCall = new SimpleJdbcCall(getDataSource()).withSchemaName(EnhancedPropertyPlaceholderConfigurer.getDB2Schema()).withProcedureName(UPDATE_PAYMENT_TRANSACTION_DB2_PROC);
    MapSqlParameterSource sqlIn = new MapSqlParameterSource();
    sqlIn.addValue(SP_BNKCRD_PMT_ORD_NBR, orderNbr);
    sqlIn.addValue(SP_CLUSTERING_NBR_2, new StringBuilder(orderNbr.substring(Math.max(orderNbr.length() - 2, 0))).reverse().toString());
    sqlIn.addValue(SP_BNKCRD_EVNT_CD, eventCd);
    sqlIn.addValue(SP_CCTRAN_ERR_CD, "N");
    sqlIn.addValue(SP_BNKCRD_PROC_RET_CD, "");
    sqlIn.addValue(SP_BNKCRD_AUTH_CD, "G");
    sqlIn.addValue(SP_ORIG_SYS_ID_TXT, orginatingSystemId);
    sqlIn.addValue(SP_BNKCRD_TRAN_AMT, amount);
    try {
        jdbcCall.execute(sqlIn);
    } catch (DataAccessException e) {
        logger.logErrorMessage("Database error in updatePaymentDetails", e);
        throw e;
    }
}

1 Ответ

0 голосов
/ 27 февраля 2019

Так как вам нужно записывать в несколько таблиц, вы можете использовать CompositeItemWriter, имеющий средство записи элемента делегата для каждой таблицы.В этом случае делегаты должны быть зарегистрированы как потоки на шаге.Вы также можете создать модуль записи одного элемента, который выдает 3 (или более) оператора вставки в разные таблицы (но я бы не рекомендовал это делать).

Если у меня есть 10 необработанных элементов и 9 хороших, но одинне удается Я все еще хочу, чтобы 9 хороших завершить и десятый вернуться в исходное состояние

Если вы используете отказоустойчивый шаг, и во время выполнения выдается исключение skippable При записи чанка Spring Batch будет сканировать чанк на предмет неисправного элемента (поскольку он не может знать, какой элемент вызвал ошибку).Технически Spring Batch установит размер чанка равным 1 и будет использовать одну транзакцию на элемент, поэтому откат будет выполняться только для неисправного элемента.Это позволяет вам выполнить вышеуказанное требование.Вот автономный пример, чтобы показать вам, как это работает:

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ChunkScanningTest.JobConfiguration.class)
public class ChunkScanningTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
    }

    @Test
    public void testChunkScanningWhenSkippableExceptionInWrite() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        Assert.assertEquals(0, peopleCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int bazCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 3 and name = 'baz'");
        Assert.assertEquals(1, fooCount); // foo is inserted
        Assert.assertEquals(1, bazCount); // baz is inserted
        Assert.assertEquals(2, peopleCount); // bar is not inserted

        Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(3, stepExecution.getCommitCount()); // one commit for foo + one commit for baz + one commit for the last (empty) chunk
        Assert.assertEquals(2, stepExecution.getRollbackCount()); // initial rollback for whole chunk + one rollback for bar
        Assert.assertEquals(2, stepExecution.getWriteCount()); // only foo and baz have been written
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            Person baz = new Person(3, "baz");
            return new ListItemReader<>(Arrays.asList(foo, bar, baz));
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            return new PersonItemWriter(dataSource());
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory.get("step")
                            .<Person, Person>chunk(3)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .faultTolerant()
                            .skip(IllegalStateException.class)
                            .skipLimit(10)
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class PersonItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        PersonItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            System.out.println("Writing items: "); items.forEach(System.out::println);
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    System.out.println("Throwing exception: No bars here!");
                    throw new IllegalStateException("No bars here!");
                }
                jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}

Этот пример печатает:

Writing items: 
Person{id=1, name='foo'}
Person{id=2, name='bar'}
Person{id=3, name='baz'}
Throwing exception: No bars here!
Writing items: 
Person{id=1, name='foo'}
Writing items: 
Person{id=2, name='bar'}
Throwing exception: No bars here!
Writing items: 
Person{id=3, name='baz'}

Как вы можете видеть, после того, как скиплинг был брошен, каждый кусок содержиттолько один элемент (Spring Batch сканирует элементы один за другим для определения неисправного), и записываются только действительные элементы.

с ограниченным числом примеров, которые можно найти

Я надеюсь, что этот пример проясняет эту особенность.Если вам нужен пример со средством составного элемента, посмотрите на этот вопрос / ответ: Как Spring Batch CompositeItemWriter управляет транзакцией для авторов делегатов?

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...