Как избежать двойной вставки в Oracle с многопоточностью - PullRequest
1 голос
/ 18 апреля 2019

В моем приложении, когда я делаю INSERT в Oracle, я получил много исключений о двойной вставке.

Мой тестовый код выглядит так

class SomeClass{
EntityManager em;
Dao dao;

@Override
void insert(String a, String b){
    MyObject object =new MyObject(a,b);
    dao.insertObject(object);
    }
}

class OtherClass{
    private final ExecutorService completableFutureExecutor =
            new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS, new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a","b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }
    }
}

В журнале openJpa я вижучто-то в этом роде

240981  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 743213969 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240983  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 468904024 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1872262728 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1258578230 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1733696457 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]

240998  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 752805342 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 1035550395 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1439514282 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1158780577 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1082517334 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]



41018  JpaPersistenceUnit  TRACE  [pool-25-thread-4] openjpa.Runtime - An exception occurred while ending the transaction.  This exception will be re-thrown.<openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.StoreException: The transaction has been rolled back.  See the nested exceptions for details on the errors that occurred.
FailedObject: com.test.SomeClass@19df04ab
	at org.apache.openjpa.kernel.BrokerImpl.newFlushException(BrokerImpl.java:2368)
	at org.apache.openjpa.kernel.BrokerImpl.flush(BrokerImpl.java:2205)
	at org.apache.openjpa.kernel.BrokerImpl.flushSafe(BrokerImpl.java:2103)
	at org.apache.openjpa.kernel.BrokerImpl.beforeCompletion(BrokerImpl.java:2021)
	at org.apache.openjpa.kernel.LocalManagedRuntime.commit(LocalManagedRuntime.java:81)
	at org.apache.openjpa.kernel.BrokerImpl.commit(BrokerImpl.java:1526)
	at org.apache.openjpa.kernel.DelegatingBroker.commit(DelegatingBroker.java:932)
	at org.apache.openjpa.persistence.EntityManagerImpl.commit(EntityManagerImpl.java:569)
	at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:514)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:475)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:270)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:633)
	at com.test.Dao$$EnhancerBySpringCGLIB$$c4aa5f08.insertObject(<generated>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
	at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
	at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
	at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
	at com.sun.proxy.$Proxy255.insertObject(Unknown Source)
	at com.test.OtherClass.lambda$method$5(OtherClass.java:146)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: <openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.ObjectExistsException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
FailedObject: com.test.entities.Table@19df04ab
	at org.apache.openjpa.jdbc.sql.DBDictionary.narrow(DBDictionary.java:4986)
	at org.apache.openjpa.jdbc.sql.DBDictionary.newStoreException(DBDictionary.java:4961)
	at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:133)
	at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:75)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:225)
	at org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager.flush(BatchingConstraintUpdateManager.java:63)
	at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:104)
	at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:77)
	at org.apache.openjpa.jdbc.kernel.JDBCStoreManager.flush(JDBCStoreManager.java:731)
	at org.apache.openjpa.kernel.DelegatingStoreManager.flush(DelegatingStoreManager.java:131)
	... 43 more
Caused by: org.apache.openjpa.lib.jdbc.ReportingSQLException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:218)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:194)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.access$1000(LoggingConnectionDecorator.java:58)
	at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator$LoggingConnection$LoggingPreparedStatement.executeUpdate(LoggingConnectionDecorator.java:1133)
	at org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:275)
	at org.apache.openjpa.jdbc.kernel.JDBCStoreManager$CancelPreparedStatement.executeUpdate(JDBCStoreManager.java:1791)
	at org.apache.openjpa.jdbc.kernel.PreparedStatementManagerImpl.executeUpdate(PreparedStatementManagerImpl.java:268)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushSingleRow(BatchingPreparedStatementManagerImpl.java:254)
	at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:157)
	... 48 more

Как мне этого избежать?Потому что в работе много таких ошибок.

UPD добавлен новый журнал в фрагмент.

Мое приложение находится на обоих серверах (узлах).Каждый сервер подключен к БД.Итак, мой тест мы можем умножить на 2.

Ответы [ 4 ]

1 голос
/ 18 апреля 2019

Вы можете синхронизировать ваш doa-объект, то есть он может быть запущен только одним потоком за раз.

  @Override
  void insert(String a, String b) {
    MyObject object = new MyObject(a, b);
    synchronized (dao) {
      dao.insertObject(object);
    }
  }

Нечто подобное выше

Основным преимуществом синхронизированного ключевого слова является возможность разрешения данных. проблемы несоответствия. Но главный недостаток синхронизирован Ключевое слово - увеличивает время ожидания потока и влияет на производительность. системы. Следовательно, если нет конкретного требования, оно никогда не будет рекомендуется использовать синхронизированное ключевое слово.

0 голосов
/ 18 апреля 2019

Возможно, вы захотите рассмотреть отложенную запись из объекта доступа к данным, что-то вроде этого -

class SomeClass {
    private EntityManager em;
    private Dao dao;
    private Set<MyObject> writableObjects = new HashSet<>();

    @Override
    public void insert(String a, String b) {
        MyObject object = new MyObject(a, b);
        writableObjects.add(object);
    }

    @override
    public void commit() {
        writableObjects.forEach(object -> dao.insertObject(object));
    }
}

class OtherClass {
    private final ExecutorService completableFutureExecutor = new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a", "b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }

        dao.commit();
    }
}
0 голосов
/ 18 апреля 2019

Я сейчас использую memcache, но это не окончательное решение

 private int expire = 2;

 public <T> void insert(Supplier<T> supplier, String... keyLine) throws InterruptedException, MemcachedException, TimeoutException {
        String key = ParseUtils.collectToKeyWithDot(keyLine);
        T value = getCache(key);
        if (value == null) {
            value = supplier.get();
            setCache(key, value);
        } 
    }

 private <T> Boolean setCache(String key, T value) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.set(key, expire, value);
    }

    private <T> T getCache(String key) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.get(key);
    }

Здесь я сохраняю свои первые INSERT в БД, а также в кеш на 2 секунды.В течение этих 2 секунд, если какой-либо Поток попытается вставить те же значения в БД, и этого не произойдет.Прежде всего, это будет проверено в кеше.Для меня достаточно 2 секунд, чтобы избежать исключений.

PS Я все еще ищу более элегантное решение.

0 голосов
/ 18 апреля 2019

Использование одного EntityManager из параллельных потоков, скорее всего, плохая идея, как вы, наверное, видели из журналов: в лучшем случае исключение в одном потоке откатывает одну транзакцию EM, которую пробуют все остальные потокииспользовать тоже.В худшем случае это всевозможные проблемы с параллелизмом, условия гонки и тому подобное.

Я хочу вставить его только один раз, и, если он уже есть в БД, выберите его.Но у меня есть около 5 методов в разных классах, которые могут вызывать эту INSERT во времени, поэтому это исключение должно быть обработано

В этом случае у вас, вероятно, не будет другого выбора, кроме как синхронизировать ваш DAOзвонки другими способами;может быть сделано путем блокировки какой-либо существующей записи в БД или любым другим способом.Вы также можете попробовать EntityManager.merge(), но я не думаю, что это решило бы вашу проблему одновременной записи двух отдельных машин.

...