Я новичок в разработке Java / Hibernate / Seam, но у меня возникла странная проблема с Hibernate и параллельными потоками.
У меня есть компонент Seam в области приложения, который выполняется через таймеры EJB вустановленный интервал (Orchestrator.java), вызывающий метод startProcessingWorkloads .
Этот метод имеет внедренный EntityManager, который он использует для проверки базы данных на предмет сбора данных, и если он находит работуКоллекция создает новый компонент асинхронного шва (LoadContoller.java) и выполняет метод start () на контроллере.
. LoadController внедряет EntityManager и использует его для выполнения очень большой транзакции (О программеодин час)
Как только LoadController работает как отдельный поток, Orchestrator все еще выполняется как поток с заданным интервалом, например,
1min
Orchestrator - ищетсобрание работ (не найдено) (тема 1)
2мин
Orchestrator - ищет коллекцию работ (находит один, запускает LoadController) (поток 1)
LoadController - запускает обновление записей базы данных (поток 2)
3min
Orchestrator - ищет коллекцию работ (Ничего не найдено) (поток 1)
LoadController - все еще обновляет записи базы данных (поток 2)
4мин
Orchestrator - ищет коллекцию работ (не найдено) (поток 1)
LoadController - Stillобновление записей базы данных (поток 2)
5 минут
Orchestrator - поиск коллекции работ (ничего не найдено) (поток 1)
LoadController - выполнение обновления записей базы данных (поток 2)
6мин
Orchestrator - ищет коллекцию работ (не найдено) (поток 1)
7мин
Orchestrator - ищет сборник работ (не найдено) (поток 1)
Однако я получаю периодически возникающую ошибку (см. Ниже), когда Orchestrator работает одновременно с LoadController.
5: 10: 40,852 WARN [AbstractBatcher] exception очистка maxRows / queryTimeout java.sql.SQLException: соединение не связано с управляемым connection.org.jboss.resource.adapter.jdbc.jdk6.WrappedConnectionJDK6@1fcdb21
Эта ошибка возникает послеOrchestrator завершил свой запрос SQl, и когда LoadController пытается выполнить свой следующий запрос SQl.
Я провел некоторое исследование и пришел к выводу, что EntityManager закрывается, поэтому LoadController не смог его использовать.
Теперь запутавшись в том, что именно закрыло соединение, я сделал несколько базовых дампов объектов менеджера сущностей, используемых Orchestrator и LoadController, когда каждый из компонентов вызывается, и я обнаружил, что непосредственно перед тем, как я получил вышеупомянутыйвозникает ошибка.
2010-07-30 15: 06: 40,804 ИНФО [processManagement.LoadController] (pool-15-thread-2) org.jboss.seam.persistence.EntityManagerProxy@7e3da1
2010-07-30 15: 10: 40,758 ИНФОРМАЦИЯ [processManagement.Orchestrator] (pool-15-thread-1) org.jboss.seam.persistence.EntityManagerProxy@7e3da1
Похоже, что в течение одного из интервалов выполнения Orchestrator он получает ссылку на тот же EntityManager, который в настоящее время использует LoadController.Когда Orchestrator завершает свое выполнение SQL, он закрывает соединение, и тогда LoadController больше не может выполнять свои обновления.
Поэтому мой вопрос: кто-нибудь знает об этом, или я получил все мои потоки в этомcode?
Насколько я понимаю, при внедрении EntityManager новый экземпляр внедряется из EntityManagerFactory, который остается с этим конкретным объектом до тех пор, пока объект не покинет область действия (в этом случае они не сохраняют состояние, поэтому когда start () методы заканчиваются), как один и тот же экземпляр менеджера сущностей может быть введен в два отдельных потока?
Orchestrator.java
@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate
public class Orchestrator {
//___________________________________________________________
@Logger Log log;
@In EntityManager entityManager;
@In LoadController loadController;
@In WorkloadManager workloadManager;
//___________________________________________________________
private int fProcessInstanceCount = 0;
//___________________________________________________________
public Orchestrator() {}
//___________________________________________________________
synchronized private void incrementProcessInstanceCount() {
fProcessInstanceCount++;
}
//___________________________________________________________
synchronized private void decreaseProcessInstanceCount() {
fProcessInstanceCount--;
}
//___________________________________________________________
@Observer("controllerExceptionEvent")
synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
decreaseProcessInstanceCount();
log.info(
"Controller " + String.valueOf(aProcess) +
" failed with the error [" + aException.getMessage() + "]"
);
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
aException,
Orchestrator.class
);
}
//___________________________________________________________
@Observer("controllerCompleteEvent")
synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
try {
MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
workloadManager.completeWorkload(completedWorklaod);
} catch (Exception ex) {
log.error(ex.getMessage(), ex);
}
decreaseProcessInstanceCount();
log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
}
//___________________________________________________________
@Asynchronous
public void startProcessingWorkloads(@IntervalDuration long interval) {
log.info("Polling for workloads.");
log.info(entityManager.toString());
try {
MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();
if (pendingWorkload != null) {
log.info(
"Pending Workload found (Workload_Id = " +
String.valueOf(pendingWorkload.getWorkloadId()) +
"), starting process controller."
);
Process aProcess = pendingWorkload.retriveProcessIdAsProcess();
ControllerIntf controller = createWorkloadController(aProcess);
if (controller != null) {
controller.start(aProcess, pendingWorkload.getWorkloadId());
workloadManager.setWorkloadProcessing(pendingWorkload);
}
}
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.applicationExceptionEvent.name(),
ex,
Orchestrator.class
);
}
log.info("Polling complete.");
}
//___________________________________________________________
private ControllerIntf createWorkloadController(Process aProcess) {
ControllerIntf newController = null;
switch(aProcess) {
case LOAD:
newController = loadController;
break;
default:
log.info(
"createWorkloadController() does not know the value (" +
aProcess.name() +
") no controller will be started."
);
}
// If a new controller is created than increase the
// count of started controllers so that we know how
// many are running.
if (newController != null) {
incrementProcessInstanceCount();
}
return newController;
}
//___________________________________________________________
}
LoadController.java
@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
//__________________________________________________
@Logger private Log log;
@In private EntityManager entityManager;
//__________________________________________________
private String fFileName = "";
private String fNMDSFileName = "";
private String fAddtFileName = "";
//__________________________________________________
public LoadController(){ }
//__________________________________________________
@Asynchronous
synchronized public void start(Process aProcess, long aWorkloadId) {
log.info(
LoadController.class.getName() +
" process thread was started for WorkloadId [" +
String.valueOf(aWorkloadId) + "]."
);
log.info(entityManager.toString());
try {
Query aQuery = entityManager.createQuery(
"from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
);
MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();
fFileName =
misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
;
fNMDSFileName = "NMDS_" + fFileName;
fAddtFileName = "Addt_" + fFileName;
createDataFile(misLoadRecord.getFileContents());
ArrayList<String>sasCode = generateSASCode(
misLoadRecord.getLoadId(),
misLoadRecord.getMdSourceSystem().getPreloadFile()
);
//TODO: As the sas password will be encrypted in the database, we will
// need to decrypt it before passing to the below function
executeLoadSASCode(
sasCode,
misLoadRecord.getInitiatedBy(),
misLoadRecord.getSasPassword()
);
createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());
//TODO: Needs to remove password from DB when complete
removeTempCSVFiles();
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerCompleteEvent.name(),
aProcess,
aWorkloadId
);
log.info(LoadController.class.getName() + " process thread completed.");
} catch (Exception ex) {
Events.instance().raiseEvent(
Application.ApplicationEvent.controllerExceptionEvent.name(),
aProcess,
ex
);
}
}
//__________________________________________________
private void createDataFile(byte[] aFileContent) throws Exception {
File dataFile =
new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);
FileUtils.writeBytesToFile(dataFile, aFileContent, true);
}
//__________________________________________________
private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
ArrayList<String> sasCode = new ArrayList<String>();
sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");
sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");
sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");
return sasCode;
}
//__________________________________________________
private void executeLoadSASCode(
ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception
{
SASExecutor aSASExecutor = new SASExecutor(
ECEConfig.getConfiguration().sas_server(),
ECEConfig.getConfiguration().sas_port(),
aUserName,
aPassword
);
aSASExecutor.execute(aSasCode);
log.info(aSASExecutor.getCompleteSasLog());
}
//__________________________________________________
/**
* Creates the MIS_UR_Workload_Contents records for
* the ECE Unit Record data that was just loaded
*
* @param aWorkloadId
* @param aMisLoadId
* @throws Exception
*/
private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {
String selectionRule =
" from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " +
String.valueOf(aMisLoadId)
;
MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
SeamManualTransaction manualTx = new SeamManualTransaction(
entityManager,
ECEConfig.getConfiguration().manualSeamTxTimeLimit()
);
manualTx.begin();
RecordPager oPager = new RecordPager(
entityManager,
selectionRule,
ECEConfig.getConfiguration().recordPagerDefaultPageSize()
);
Object nextRecord = null;
while ((nextRecord = oPager.getNextRecord()) != null) {
EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;
MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();
aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
aContentsRecord.setMisWorkload(misWorkload);
aContentsRecord.setProcessOutcome('C');
entityManager.persist(aContentsRecord);
}
manualTx.commit();
}
/**
* Removes the CSV temp files that are created for input
* into the SAS server and that are created as output.
*/
private void removeTempCSVFiles() {
String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);
if (dataInputCSV.exists()) {
dataInputCSV.delete();
}
if (nmdsOutputCSV.exists()) {
nmdsOutputCSV.delete();
}
if (addtOutputCSV.exists()) {
addtOutputCSV.delete();
}
}
}
SeamManualTransaction.java
public class SeamManualTransaction {
//___________________________________________________________
private boolean fObjectUsed = false;
private boolean fJoinExistingTransaction = true;
private int fTransactionTimeout = 60; // Default: 60 seconds
private UserTransaction fUserTx;
private EntityManager fEntityManager;
//___________________________________________________________
/**
* Set the transaction timeout in milliseconds (from minutes)
*
* @param aTimeoutInMins The number of minutes to keep the transaction active
*/
private void setTransactionTimeout(int aTimeoutInSecs) {
// 60 * aTimeoutInSecs = Timeout in Seconds
fTransactionTimeout = 60 * aTimeoutInSecs;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
*/
public SeamManualTransaction(EntityManager aEntityManager) {
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
setTransactionTimeout(aTimeoutInSecs);
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Constructor
*
* @param aEntityManager
* @param aTimeoutInSecs
* @param aJoinExistingTransaction
*/
public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
setTransactionTimeout(aTimeoutInSecs);
fJoinExistingTransaction = aJoinExistingTransaction;
fEntityManager = aEntityManager;
}
//___________________________________________________________
/**
* Starts the new transaction
*
* @throws Exception
*/
public void begin() throws Exception {
if (fObjectUsed) {
throw new Exception(
SeamManualTransaction.class.getCanonicalName() +
" has been used. Create new instance."
);
}
fUserTx =
(UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction");
fUserTx.setTransactionTimeout(fTransactionTimeout);
fUserTx.begin();
/* If entity manager is created before the transaction
* is started (ie. via Injection) then it must join the
* transaction
*/
if (fJoinExistingTransaction) {
fEntityManager.joinTransaction();
}
}
//___________________________________________________________
/**
* Commit the transaction to the database
*
* @throws Exception
*/
public void commit() throws Exception {
fObjectUsed = true;
fUserTx.commit();
}
// ___________________________________________________________
/ **
* Откатывает транзакцию обратно
*
* Исключения @throws
* /
public void rollback () создает исключение {
fObjectUsed = true;
fUserTx.rollback ();
}
// ___________________________________________________________
}