У меня есть проблема, когда мне нужно прочитать «необработанные» данные из базы данных SQL Server, обработать их, затем выборочно обновить две-шесть таблиц в базе данных DB2, а затем пометить эти данные как обработанные в исходной базе данных на SQLСервер.В любой момент, если что-то не получится, я хочу откатить все обновления.Если у меня есть 10 необработанных предметов и 9 хороших, но один не работает, я все же хочу, чтобы 9 хороших завершили и десятый вернулись в исходное состояние, пока мы не сможем исследовать проблему и внести исправления.
Общая архитектура такова, что один входной экземпляр может привести к вставке как минимум в 3 таблицы DB2 и до 7 таблиц.Несколько таблиц DB2 могут заканчиваться несколькими вставками из одного ввода.Для каждого обновления таблицы мне нужно было бы разработать отдельного писателя и выяснить, как передать каждому писателю конкретные данные, необходимые для этой таблицы.Мне также нужно использовать 2 источника данных для обновлений DB2 и SQL Server соответственно.
Я не опытный разработчик Spring Batch.И у меня редко бывает проект, где я могу «прочитать 1, процесс 1, написать 1» и повторить.Обычно мне нужно прочитать несколько файлов / баз данных, обработать эти данные, а затем записать в один или несколько отчетов, файлов и / или баз данных.Я вижу, где предоставляется поддержка для такого рода приложений, но это более сложный процесс, требующий дополнительных исследований и ограниченных примеров.
Пытаясь найти решение, я выбрал легкий путь.Я разработал класс, который реализует Tasklet и написал код так, как работает мой процесс в реальном времени.Он извлекает входные данные из SQL с использованием JDBCTemplate, затем передает данные в код, который обрабатывает данные и определяет, что необходимо обновить.У меня есть класс диспетчера транзакций, который реализует @Transactional с REQUIRES_NEW и откатом для моего пользовательского непроверенного исключения.Класс Transactional перехватывает все события DataAccessException и генерирует пользовательское исключение.В настоящее время я использую только источник данных DB2, чтобы не усложнять ситуацию.
В своем тестировании я добавил код в конце процесса обновления, который выдает непроверенное исключение.Я ожидал, что обновления будут отменены.Но этого не произошло.При повторном запуске процесса я получаю 803 ошибки в DB2.
И последнее.В нашем магазине мы должны использовать хранимые процедуры в DB2 для любого доступа.Поэтому я использую SimpleJdbcCall для выполнения SP.
Вот мой код:
Основной Java-класс для тасклета:
public class SynchronizeDB2WithSQL implements Tasklet
{
private static final BatchLogger logger = BatchLogger.getLogger();
private Db2UpdateTranManager tranMgr;
public void setTranMgr(Db2UpdateTranManager tranMgr) {
this.tranMgr = tranMgr;
}
private AccessPaymentIntegrationDAO pmtIntDAO;
public void setPmtIntDAO(AccessPaymentIntegrationDAO pmtIntDAO) {
this.pmtIntDAO = pmtIntDAO;
}
@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
logger.logInfoMessage("=============================================");
logger.logInfoMessage(" EB0255IA - Synchronize DB2 with SQL");
logger.logInfoMessage("=============================================");
List<UnprocessedPaymentDataBean> orderList = this.pmtIntDAO.fetchUnprocessedEntries();
if(CollectionUtils.isNotEmpty(orderList)) {
for(UnprocessedPaymentDataBean ent: orderList) {
logger.logDebugMessage(" Processing payment ");
logger.logDebugMessage(ent.toString());
Map<String, List<PaymentTransactionDetailsBean>> paymentDetails = arrangePayments(this.pmtIntDAO.getDetailsByOrder(ent.getOrderNbr()));
try {
this.tranMgr.createNewAuthorizedPayment(ent, paymentDetails);
} catch (DataException e) {
logger.logErrorMessage("Encountered a Data Exception: "+e);
}
}
} else {
logger.logInfoMessage("=============================================");
logger.logInfoMessage("No data was encountered that needed to be processed");
logger.logInfoMessage("=============================================");
}
return RepeatStatus.FINISHED;
}
и Spring Batch xml:
<job id="EB0255IA" parent="baseJob" job-repository="jobRepository"
xmlns="http://www.springframework.org/schema/batch" restartable="true"
incrementer="parameterIncrementer">
<description>Job to maintain the DB2 updates for payment activity</description>
<step id="SynchronizeDB2WithSQL">
<tasklet ref="synchronizeTasklet" />
</step>
</job>
<bean id="synchronizeTasklet" class="com.ins.pmtint.synchdb2.SynchronizeDB2WithSQL" >
<property name="pmtIntDAO" ref="pmtIntDAO" />
<property name="tranMgr" ref="db2TranMgr" />
</bean>
<bean id="jdbcUpdateDB2" class="com.ins.pmtint.db.JDBCUpdateDB2">
<property name="dataSource" ref="dataSourceBnkDB2" />
</bean>
<bean id="updateDB2DataDAO" class="com.ins.pmtint.db.dao.UpdateDB2DataDAOImpl">
<property name="jdbcUpdateDB2" ref="jdbcUpdateDB2" />
</bean>
<bean id="db2TranMgr" class="com.ins.pmtint.db.tranmgr.Db2UpdateTranManagerImpl">
<property name="updateDB2DataDAO" ref="updateDB2DataDAO" />
</bean>
<bean id="jdbcPaymentIntegration" class="com.ins.pmtint.db.JDBCPaymentIntegration" >
<property name="dataSource" ref="dataSourcePmtIntegration" />
</bean>
<bean id="pmtIntDAO" class="com.ins.pmtint.db.dao.AccessPaymentIntegrationDAOImpl">
<property name="jdbcPaymentIntegration" ref="jdbcPaymentIntegration" />
</bean>
Часть реализации менеджера транзакций.
public class Db2UpdateTranManagerImpl implements Db2UpdateTranManager, DB2FieldNames {
private static final BatchLogger logger = BatchLogger.getLogger();
UpdateDB2DataDAO updateDB2DataDAO;
public void setUpdateDB2DataDAO(UpdateDB2DataDAO updateDB2DataDAO) {
this.updateDB2DataDAO = updateDB2DataDAO;
}
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public void createNewAuthorizedPayment(UnprocessedPaymentDataBean dataBean, Map<String, List<PaymentTransactionDetailsBean>> paymentDetails) {
logger.logDebugMessage("At Db2UpdateTranManagerImpl.createNewAuthorizedPayment(");
logger.logDebugMessage(dataBean.toString());
String orderNbr = String.valueOf(dataBean.getOrderNbr());
String eventCode = TranTypeCode.fromValue(dataBean.getTransactionTypeCode()).getDB2Event();
if(eventCode == null) {
try {
KFBDistBatchEMail.createAndSendMessage("There is no event code for current entry\n\nOrder: "+orderNbr+" Tran type: "+dataBean.getTransactionTypeCode(), "EB0255IA - Database error" ,EnhancedPropertyPlaceholderConfigurer.getEmailFrom(), EnhancedPropertyPlaceholderConfigurer.getEmailTo(), null);
throw new DataException("Update failed: No event code to apply");
} catch (EMailExcpetion e2) {
logger.logErrorMessage("Generating email", e2);
}
}
String orginatingSystemId;
if (dataBean.getPaymentTypeCode().equalsIgnoreCase("EFT"))
orginatingSystemId = "FS";
else
orginatingSystemId = "IN";
try {
if(dataBean.getTransactionTypeCode().equalsIgnoreCase("A")) {
this.updateDB2DataDAO.updatePaymentDetails(orderNbr, DB_INITIAL_EVENT_CODE, "", dataBean.getTransactionAmt(), orginatingSystemId);
}
**** FOR TESTING - AT THE END I HAVE ADDED ****
throw new DataException("I finished processing and backed out. \n\n"+dataBean);
}
И это часть кода JDBC:
public class JDBCUpdateDB2 extends JdbcDaoSupport
implements DB2FieldNames
{
private static final BatchLogger logger = KFBBatchLogger.getLogger();
public void updatePaymentDetails(String orderNbr, String eventCd, String authnbr, Double amount, String orginatingSystemId) {
SimpleJdbcCall jdbcCall = new SimpleJdbcCall(getDataSource()).withSchemaName(EnhancedPropertyPlaceholderConfigurer.getDB2Schema()).withProcedureName(UPDATE_PAYMENT_TRANSACTION_DB2_PROC);
MapSqlParameterSource sqlIn = new MapSqlParameterSource();
sqlIn.addValue(SP_BNKCRD_PMT_ORD_NBR, orderNbr);
sqlIn.addValue(SP_CLUSTERING_NBR_2, new StringBuilder(orderNbr.substring(Math.max(orderNbr.length() - 2, 0))).reverse().toString());
sqlIn.addValue(SP_BNKCRD_EVNT_CD, eventCd);
sqlIn.addValue(SP_CCTRAN_ERR_CD, "N");
sqlIn.addValue(SP_BNKCRD_PROC_RET_CD, "");
sqlIn.addValue(SP_BNKCRD_AUTH_CD, "G");
sqlIn.addValue(SP_ORIG_SYS_ID_TXT, orginatingSystemId);
sqlIn.addValue(SP_BNKCRD_TRAN_AMT, amount);
try {
jdbcCall.execute(sqlIn);
} catch (DataAccessException e) {
logger.logErrorMessage("Database error in updatePaymentDetails", e);
throw e;
}
}