Seam Hibernate Обслуживает один и тот же экземпляр EntityManger двум отдельным потокам. - PullRequest
2 голосов
/ 30 июля 2010

Я новичок в разработке Java / Hibernate / Seam, но у меня возникла странная проблема с Hibernate и параллельными потоками.

У меня есть компонент Seam в области приложения, который выполняется через таймеры EJB вустановленный интервал (Orchestrator.java), вызывающий метод startProcessingWorkloads .

Этот метод имеет внедренный EntityManager, который он использует для проверки базы данных на предмет сбора данных, и если он находит работуКоллекция создает новый компонент асинхронного шва (LoadContoller.java) и выполняет метод start () на контроллере.

. LoadController внедряет EntityManager и использует его для выполнения очень большой транзакции (О программеодин час)

Как только LoadController работает как отдельный поток, Orchestrator все еще выполняется как поток с заданным интервалом, например,

1min
Orchestrator - ищетсобрание работ (не найдено) (тема 1)


2мин
Orchestrator - ищет коллекцию работ (находит один, запускает LoadController) (поток 1)
LoadController - запускает обновление записей базы данных (поток 2)


3min
Orchestrator - ищет коллекцию работ (Ничего не найдено) (поток 1)
LoadController - все еще обновляет записи базы данных (поток 2)

4мин
Orchestrator - ищет коллекцию работ (не найдено) (поток 1)
LoadController - Stillобновление записей базы данных (поток 2)


5 минут
Orchestrator - поиск коллекции работ (ничего не найдено) (поток 1)
LoadController - выполнение обновления записей базы данных (поток 2)


6мин
Orchestrator - ищет коллекцию работ (не найдено) (поток 1)
7мин
Orchestrator - ищет сборник работ (не найдено) (поток 1)

Однако я получаю периодически возникающую ошибку (см. Ниже), когда Orchestrator работает одновременно с LoadController.

5: 10: 40,852 WARN [AbstractBatcher] exception очистка maxRows / queryTimeout java.sql.SQLException: соединение не связано с управляемым connection.org.jboss.resource.adapter.jdbc.jdk6.WrappedConnectionJDK6@1fcdb21

Эта ошибка возникает послеOrchestrator завершил свой запрос SQl, и когда LoadController пытается выполнить свой следующий запрос SQl.

Я провел некоторое исследование и пришел к выводу, что EntityManager закрывается, поэтому LoadController не смог его использовать.

Теперь запутавшись в том, что именно закрыло соединение, я сделал несколько базовых дампов объектов менеджера сущностей, используемых Orchestrator и LoadController, когда каждый из компонентов вызывается, и я обнаружил, что непосредственно перед тем, как я получил вышеупомянутыйвозникает ошибка.

2010-07-30 15: 06: 40,804 ИНФО [processManagement.LoadController] (pool-15-thread-2) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

2010-07-30 15: 10: 40,758 ИНФОРМАЦИЯ [processManagement.Orchestrator] (pool-15-thread-1) org.jboss.seam.persistence.EntityManagerProxy@7e3da1

Похоже, что в течение одного из интервалов выполнения Orchestrator он получает ссылку на тот же EntityManager, который в настоящее время использует LoadController.Когда Orchestrator завершает свое выполнение SQL, он закрывает соединение, и тогда LoadController больше не может выполнять свои обновления.

Поэтому мой вопрос: кто-нибудь знает об этом, или я получил все мои потоки в этомcode?

Насколько я понимаю, при внедрении EntityManager новый экземпляр внедряется из EntityManagerFactory, который остается с этим конкретным объектом до тех пор, пока объект не покинет область действия (в этом случае они не сохраняют состояние, поэтому когда start () методы заканчиваются), как один и тот же экземпляр менеджера сущностей может быть введен в два отдельных потока?

Orchestrator.java

@Name("processOrchestrator")
@Scope(ScopeType.APPLICATION)
@AutoCreate 
public class Orchestrator {

  //___________________________________________________________

  @Logger Log log;

  @In EntityManager entityManager;

  @In LoadController loadController;

  @In WorkloadManager workloadManager;

  //___________________________________________________________

  private int fProcessInstanceCount = 0;

  //___________________________________________________________

  public Orchestrator() {}

  //___________________________________________________________

  synchronized private void incrementProcessInstanceCount() {
    fProcessInstanceCount++;
  }

  //___________________________________________________________

  synchronized private void decreaseProcessInstanceCount() {
    fProcessInstanceCount--;
  }

  //___________________________________________________________

  @Observer("controllerExceptionEvent") 
  synchronized public void controllerExceptionListiner(Process aProcess, Exception aException) {
    decreaseProcessInstanceCount();

    log.info(
      "Controller " + String.valueOf(aProcess) + 
      " failed with the error [" + aException.getMessage() + "]"
    );

    Events.instance().raiseEvent(
      Application.ApplicationEvent.applicationExceptionEvent.name(), 
      aException,
      Orchestrator.class
    );
  }

  //___________________________________________________________

  @Observer("controllerCompleteEvent") 
  synchronized public void successfulControllerCompleteListiner(Process aProcess, long aWorkloadId) {
    try {
      MisWorkload completedWorklaod = entityManager.find(MisWorkload.class, aWorkloadId);
      workloadManager.completeWorkload(completedWorklaod);
    } catch (Exception ex) {
      log.error(ex.getMessage(), ex);
    }

    decreaseProcessInstanceCount();

    log.info("Controller " + String.valueOf(aProcess) + " completed successfuly");
  }

  //___________________________________________________________

  @Asynchronous
  public void startProcessingWorkloads(@IntervalDuration long interval) {
    log.info("Polling for workloads.");
    log.info(entityManager.toString());
    try {
      MisWorkload pendingWorkload = workloadManager.getNextPendingWorkload();

      if (pendingWorkload != null) {
        log.info(
          "Pending Workload found (Workload_Id = " + 
          String.valueOf(pendingWorkload.getWorkloadId()) + 
          "), starting process controller."
        );

        Process aProcess = pendingWorkload.retriveProcessIdAsProcess();

        ControllerIntf controller = createWorkloadController(aProcess);          

        if (controller != null) {
          controller.start(aProcess, pendingWorkload.getWorkloadId());
          workloadManager.setWorkloadProcessing(pendingWorkload);
        }
      }

    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.applicationExceptionEvent.name(), 
        ex,
        Orchestrator.class
      );
    }

    log.info("Polling complete.");
  }

  //___________________________________________________________  

  private ControllerIntf createWorkloadController(Process aProcess) {
    ControllerIntf newController = null;

    switch(aProcess) {
      case LOAD:
        newController = loadController;
        break;

      default:
        log.info(
          "createWorkloadController() does not know the value (" +
          aProcess.name() + 
          ") no controller will be started."
        );
    }

    // If a new controller is created than increase the 
    // count of started controllers so that we know how
    // many are running.
    if (newController != null) {
      incrementProcessInstanceCount();
    }

    return newController;
  }

  //___________________________________________________________

}

LoadController.java

@Name("loadController")
@Scope(ScopeType.STATELESS)
@AutoCreate
public class LoadController implements ControllerIntf {
  //__________________________________________________

  @Logger private Log log;

  @In private EntityManager entityManager; 

  //__________________________________________________

  private String fFileName = "";
  private String fNMDSFileName = "";
  private String fAddtFileName = "";

  //__________________________________________________

  public LoadController(){  }
  //__________________________________________________

  @Asynchronous 
  synchronized public void start(Process aProcess, long aWorkloadId) {
    log.info(
      LoadController.class.getName() + 
      " process thread was started for WorkloadId [" + 
      String.valueOf(aWorkloadId) + "]."
    );
    log.info(entityManager.toString());
    try {
      Query aQuery = entityManager.createQuery(
        "from MisLoad MIS_Load where Workload_Id = " + String.valueOf(aWorkloadId)
      );

      MisLoad misLoadRecord = (MisLoad)aQuery.getSingleResult();

      fFileName = 
        misLoadRecord.getInitiatedBy().toUpperCase() + "_" +
        misLoadRecord.getMdSourceSystem().getMdState().getShortName() + "_" +
        DateUtils.now(DateUtils.FORMAT_FILE) + ".csv"
      ;

      fNMDSFileName = "NMDS_" + fFileName;
      fAddtFileName = "Addt_" + fFileName;

      createDataFile(misLoadRecord.getFileContents());

      ArrayList<String>sasCode = generateSASCode(
        misLoadRecord.getLoadId(),
        misLoadRecord.getMdSourceSystem().getPreloadFile()
      );

      //TODO: As the sas password will be encrypted in the database, we will
      //      need to decrypt it before passing to the below function
      executeLoadSASCode(
        sasCode, 
        misLoadRecord.getInitiatedBy(), 
        misLoadRecord.getSasPassword()
      );

      createWorkloadContentRecords(aWorkloadId, misLoadRecord.getLoadId());

      //TODO: Needs to remove password from DB when complete
      removeTempCSVFiles();

      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerCompleteEvent.name(), 
        aProcess, 
        aWorkloadId
      );

      log.info(LoadController.class.getName() + " process thread completed.");
    } catch (Exception ex) {
      Events.instance().raiseEvent(
        Application.ApplicationEvent.controllerExceptionEvent.name(),
        aProcess, 
        ex
      );
    }
  }
  //__________________________________________________

  private void createDataFile(byte[] aFileContent) throws Exception {
    File dataFile = 
      new File(ECEConfig.getConfiguration().sas_tempFileDir() + "\\" + fFileName);

    FileUtils.writeBytesToFile(dataFile, aFileContent, true);
  }

  //__________________________________________________

  private ArrayList<String> generateSASCode(long aLoadId, String aSourceSystemPreloadSasFile) {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    ArrayList<String> sasCode = new ArrayList<String>();

    sasCode.add("%let sOracleUserId = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sOraclePassword = " + ECEConfig.getConfiguration().oracle_password() + ";");
    sasCode.add("%let sOracleSID = " + ECEConfig.getConfiguration().oracle_sid() + ";");
    sasCode.add("%let sSchema = " + ECEConfig.getConfiguration().oracle_username() + ";");
    sasCode.add("%let sECESASSourceDir = " + ECEConfig.getConfiguration().sas_sourceDir() + ";");    
    sasCode.add("libname lOracle ORACLE user=&sOracleUserId pw=&sOraclePassword path=&sOracleSID schema=&sSchema;");

    sasCode.add("%let sCommaDelimiter = %str(" + ECEConfig.getConfiguration().dataload_csvRawDataFileDelimiter() + ");");
    sasCode.add("%let sPipeDelimiter = %nrquote(" + ECEConfig.getConfiguration().dataload_csvNMDSDataFileDelimiter() + ");");
    sasCode.add("%let sDataFileLocation = " + sasTempDir + "\\" + fFileName + ";");
    sasCode.add("%let sNMDSOutputDataFileLoc = " + sasTempDir + "\\" + fNMDSFileName + ";");
    sasCode.add("%let sAddtOutputDataFileLoc = " + sasTempDir + "\\" + fAddtFileName + ";");
    sasCode.add("%let iLoadId = " + String.valueOf(aLoadId) + ";");

    sasCode.add("%include \"&sECESASSourceDir\\ECE_UtilMacros.sas\";");
    sasCode.add("%include \"&sECESASSourceDir\\" + aSourceSystemPreloadSasFile + "\";");
    sasCode.add("%include \"&sECESASSourceDir\\ECE_NMDSLoad.sas\";");
    sasCode.add("%preload(&sDataFileLocation, &sCommaDelimiter, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter);");
    sasCode.add("%loadNMDS(lOracle, &sNMDSOutputDataFileLoc, &sAddtOutputDataFileLoc, &sPipeDelimiter, &iLoadId);");

    return sasCode;
  }

  //__________________________________________________

  private void executeLoadSASCode(
    ArrayList<String> aSasCode, String aUserName, String aPassword) throws Exception 
  {
    SASExecutor aSASExecutor = new SASExecutor(
      ECEConfig.getConfiguration().sas_server(),
      ECEConfig.getConfiguration().sas_port(),
      aUserName, 
      aPassword
    );

    aSASExecutor.execute(aSasCode);

    log.info(aSASExecutor.getCompleteSasLog());
  }
  //__________________________________________________

  /**
   * Creates the MIS_UR_Workload_Contents records for 
   * the ECE Unit Record data that was just loaded
   * 
   * @param aWorkloadId
   * @param aMisLoadId
   * @throws Exception
   */

  private void createWorkloadContentRecords(long aWorkloadId, long aMisLoadId) throws Exception {

    String selectionRule = 
      " from EceUnitRecord ECE_Unit_Record where ECE_Unit_Record.loadId = " + 
      String.valueOf(aMisLoadId)
    ;
    MisWorkload misWorkload = entityManager.find(MisWorkload.class, aWorkloadId);
    SeamManualTransaction manualTx = new SeamManualTransaction(
      entityManager, 
      ECEConfig.getConfiguration().manualSeamTxTimeLimit()
    );
    manualTx.begin();
    RecordPager oPager = new RecordPager(
      entityManager, 
      selectionRule, 
      ECEConfig.getConfiguration().recordPagerDefaultPageSize()
    );

    Object nextRecord = null;

    while ((nextRecord = oPager.getNextRecord()) != null) {
      EceUnitRecord aEceUnitRecord = (EceUnitRecord)nextRecord;

      MisUrWorkloadContents aContentsRecord = new MisUrWorkloadContents();

      aContentsRecord.setEceUnitRecordId(aEceUnitRecord.getEceUnitRecordId());
      aContentsRecord.setMisWorkload(misWorkload);
      aContentsRecord.setProcessOutcome('C');

      entityManager.persist(aContentsRecord);
    }

    manualTx.commit();
  }

  /**
   * Removes the CSV temp files that are created for input 
   * into the SAS server and that are created as output.  
   */

  private void removeTempCSVFiles() {
    String sasTempDir = ECEConfig.getConfiguration().sas_tempFileDir();
    File dataInputCSV = new File(sasTempDir + "\\" + fFileName);
    File nmdsOutputCSV = new File(sasTempDir + "\\" + fNMDSFileName);
    File addtOutputCSV = new File(sasTempDir + "\\" + fAddtFileName);

    if (dataInputCSV.exists()) {
      dataInputCSV.delete();
    }
    if (nmdsOutputCSV.exists()) {
      nmdsOutputCSV.delete();
    }

    if (addtOutputCSV.exists()) {
      addtOutputCSV.delete();
    }
  }
}

SeamManualTransaction.java

public class SeamManualTransaction {

  //___________________________________________________________

  private boolean fObjectUsed = false;
  private boolean fJoinExistingTransaction = true;
  private int fTransactionTimeout = 60; // Default: 60 seconds
  private UserTransaction fUserTx;
  private EntityManager fEntityManager;

  //___________________________________________________________

  /**
   * Set the transaction timeout in milliseconds (from minutes)
   * 
   * @param aTimeoutInMins The number of minutes to keep the transaction active
   */

  private void setTransactionTimeout(int aTimeoutInSecs) {
    // 60 * aTimeoutInSecs = Timeout in Seconds
    fTransactionTimeout = 60 * aTimeoutInSecs;
  }

  //___________________________________________________________

  /**
   * Constructor 
   * 
   * @param aEntityManager
   */

  public SeamManualTransaction(EntityManager aEntityManager) {
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   */

  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs) {
    setTransactionTimeout(aTimeoutInSecs);
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Constructor
   * 
   * @param aEntityManager
   * @param aTimeoutInSecs
   * @param aJoinExistingTransaction
   */
  public SeamManualTransaction(EntityManager aEntityManager, int aTimeoutInSecs, boolean aJoinExistingTransaction) {
    setTransactionTimeout(aTimeoutInSecs);
    fJoinExistingTransaction = aJoinExistingTransaction;
    fEntityManager = aEntityManager;
  }

  //___________________________________________________________

  /**
   * Starts the new transaction
   * 
   * @throws Exception
   */

  public void begin() throws Exception {
    if (fObjectUsed) {
      throw new Exception(
        SeamManualTransaction.class.getCanonicalName() + 
        " has been used. Create new instance."
      );
    }

    fUserTx = 
      (UserTransaction) org.jboss.seam.Component.getInstance("org.jboss.seam.transaction.transaction"); 

    fUserTx.setTransactionTimeout(fTransactionTimeout);
    fUserTx.begin(); 

    /* If entity manager is created before the transaction 
     * is started (ie. via Injection) then it must join the 
     * transaction 
     */ 
    if (fJoinExistingTransaction) {
      fEntityManager.joinTransaction();
    }
  }

  //___________________________________________________________

  /**
   * Commit the transaction to the database
   * 
   * @throws Exception
   */

  public void commit() throws Exception {
    fObjectUsed = true;
    fUserTx.commit();
  }

// ___________________________________________________________

/ ** * Откатывает транзакцию обратно * * Исключения @throws * /

public void rollback () создает исключение { fObjectUsed = true; fUserTx.rollback (); }

// ___________________________________________________________ }

Ответы [ 2 ]

0 голосов
/ 05 августа 2010

Как правило, внедрение entityManager в компонент Seam области APPLICATION является неправильным. Менеджер сущностей - это то, что вы создаете, используете и закрываете снова, в области, обычно намного короче, чем область ПРИЛОЖЕНИЯ.

Улучшите ситуацию, выбрав меньшие области со стандартным внедрением entityManager или, если вам нужна область APPLICATION, вместо этого введите EntityManagerFactory, а затем создайте, используйте и закройте entityManager самостоятельно.

Загляните в ваш файл Seam components.xml, чтобы найти имя вашего компонента EntityManagerFactory.

0 голосов
/ 31 июля 2010

Ну, мой первый совет:

Если вы используете приложение EJB, предпочтите использовать Управляемая транзакция Bean вместо вашей пользовательской SeamManualTransaction .Когда вы используете управляемую Bean-транзакцию, вы, как разработчик, заботитесь о том, чтобы вызывать begin и commit.Вы получаете эту функцию с помощью компонента UserTransaction.Вы можете создать слой Фасад, который начинается и фиксирует вашу Транзакцию.Что-то вроде

/**
  * default scope when using @Stateless session bean is ScopeType.STATELESS
  *
  * So you do not need to declare @Scope(ScopeType.STATELESS) anymore
  *
  * A session bean can not use both BEAN and CONTAINER Transaction management at The same Time
  */
@Stateless
@Name("businessFacade")
@TransactionManagement(TransactionManagerType.BEAN)
public class BusinessFacade implements BusinessFacadeLocal {

    private @Resource TimerService timerService;
    private @Resource UserTransaction userTransaction;
    /**
      * You can use @In of you are using Seam capabilities
      */
    private @PersistenceContext entityManager;

    public void doSomething() {
        try {
            userTransaction.begin();
            userTransaction.setTransactionTimeout(int seconds);

            // business logic goes here

            /**
              * To enable your Timer service, just call
              *
              * timerService.createTimer(15*60*1000, 15*60*1000, <ANY_SERIALIZABLE_INFO_GOES_HERE>);
              */

            userTransaction.commit();
        } catch (Exception e) {
            userTransaction.rollback();
        }
    }

    @Timeout
    public void doTimer(Timer timer) {
        try {
            userTransaction.begin();         

            timer.getInfo();

            // logic goes here

            userTransaction.commit();
        } catch (Exception e) {
            userTransaction.rollback();
        }
    }

}

Давайте посмотрим UserTransaction.begin API метода

Создайте новую транзакцию и свяжите ее с текущим потоком

Существует еще:

Время жизни контекста персистентности, управляемого контейнером (внедренного через аннотацию @PersistenceContext), соответствует области действиятранзакция (между началом и вызовом метода commit) при использовании контекста персистентности в области транзакции

Теперь давайте посмотрим TimerService

Это предоставляемая контейнером служба, которая позволяет корпоративным компонентам регистрироваться для методов обратного вызова таймера , которые происходят в указанное время, по истечении заданного времени или после указанных интервалов.Класс EJB корпоративного компонента , который использует службу таймера, должен обеспечивать метод обратного вызова тайм-аута .Таймеры могут быть созданы для сеансовых компонентов без сохранения состояния, управляемых сообщениями bean-компонентов

Надеюсь, это может быть полезно для вас

...