Как реализовать EJB 3.0 с помощью EJB 3.0? - PullRequest
3 голосов
/ 31 марта 2010

[РЕДАКТИРОВАТЬ] Этот вопрос «как я могу сделать атомарные изменения в компонентах EJB 3 и JPA 2.0». Должно быть просто, верно?

Я пытался исправить свой код на основе ответов, которые я получил до сих пор. Я использую JBoss 6.0.0M2 с Hypersonic (просто скачайте его и вызовите run.bat).

Мой тестовый пример: создайте 3 потока и вызовите один из testCounterMitLock*() 500 раз в цикле. Так что успешный тест должен напечатать «Anzahl eingetragene Zeilen: 1500» (3 * 500).

Я пытался:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
        manager.lock(ct, LockModeType.WRITE);
        int wert = ct.getWert();

Очевидно, что не работает, потому что другой поток может изменить значение в базе данных до применения блокировки. Поэтому я пытаюсь это исправить:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
        manager.lock(ct, LockModeType.WRITE);
        manager.refresh (ct);
        int wert = ct.getWert();

refresh() должен дать мне текущее значение, а неявный запрос также должен убедиться, что объект теперь заблокирован. Нет такой удачи. Давайте попробуем с JPA 2.0:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.WRITE);
        int wert = ct.getWert();

Это тоже не работает. Может быть, блокировки недостаточно?

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        int wert = ct.getWert();

Хм ... тоже не работает! Последняя отчаянная попытка:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        manager.flush();
        manager.refresh (ct);
        int wert = ct.getWert();

Хорошо ... кто-нибудь может объяснить, почему ничего не работает? У меня нет идей.

[EDIT2] PS: чтобы добавить оскорбление к травме, это последний вывод последнего запущенного потока:

commit/rollback: 441/62

(441 + 62 = 503) ...

Вот полный код. Сначала боб:

package server.kap15;

import java.rmi.RemoteException;

import javax.ejb.*;
import javax.persistence.*;

@Stateful
public class CounterTestBean implements CounterTestRemote, SessionSynchronization {
    @PersistenceContext(unitName = "JavaEE")
    EntityManager manager;

    private int commit = 0;

    private int rollback = 0;

    public void initDatenbank() {
        manager.createNamedQuery("CounterTest.deleteAll").executeUpdate();
        manager.createNamedQuery("TestTabelle.deleteAll").executeUpdate();
        CounterTestVersion ct = new CounterTestVersion();
        ct.setNr(1);
        ct.setVersion(1);
        ct.setWert(1);
        manager.persist(ct);
    }

    public boolean testCounterOhneLock() {
        try {
            CounterTest ct = manager.find(CounterTest.class, 1);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
            manager.lock(ct, LockModeType.WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock2() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1);
            manager.lock(ct, LockModeType.WRITE);
            manager.refresh (ct);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock3() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock4() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitLock5() {
        try {
            CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
            manager.flush();
            manager.refresh (ct);
            int wert = ct.getWert();
            ct.setWert(wert + 1);
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (Throwable t) {
            return false;
        }
    }

    public boolean testCounterMitVersion() {
        try {
            CounterTestVersion ctv = manager.find(CounterTestVersion.class, 1);
            int wert = ctv.getWert();
            ctv.setWert(wert + 1);
            manager.flush();
            TestTabelle tt = new TestTabelle();
            tt.setNr(wert);
            manager.persist(tt);
            manager.flush();
            return true;
        } catch (OptimisticLockException e) {
            System.out.println(">>> Versionskonflikt !");
            return false;
        } catch (Throwable t) {
            System.out.println(t.getMessage());
            return false;
        }
    }

    public long anzTestZeilen() {
        Query query = manager.createNamedQuery("TestTabelle.anzZeilen");
        Long anzahl = (Long) query.getSingleResult();
        return anzahl;
    }

    public void afterBegin() throws EJBException, RemoteException {
    }

    public void beforeCompletion() throws EJBException, RemoteException {
    }

    public void afterCompletion(boolean committed) throws EJBException,
    RemoteException {
        if (committed)
            commit++;
        else
            rollback++;
        System.out.println("commit/rollback: " + commit + "/" + rollback);
    }
}

Удаленный интерфейс:

package server.kap15;

import javax.ejb.Remote;

@Remote
public interface CounterTestRemote {
    public void initDatenbank();

    public boolean testCounterOhneLock();

    public boolean testCounterMitLock();
    public boolean testCounterMitLock2();
    public boolean testCounterMitLock3();
    public boolean testCounterMitLock4();
    public boolean testCounterMitLock5();

    public boolean testCounterMitVersion();

    public long anzTestZeilen();
}

Persistence.xml:

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/persistence
        http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"
    version="1.0">
    <persistence-unit name="JavaEE">
        <jta-data-source>java:DefaultDS</jta-data-source>
    </persistence-unit>
</persistence>

Тестовый клиент:

package client.kap15;

import java.util.Properties;
import javax.naming.*;
import javax.rmi.PortableRemoteObject;
import server.kap15.CounterTestRemote;

public class CounterTestMitLock extends Thread {
    CounterTestRemote ctr;

    public static void main(String[] args) {
        try
        {
            testMitLock();
            testMitLock2();
            testMitLock3();
            testMitLock4();
            testMitLock5();
        }
        catch (Exception e)
        {
            e.printStackTrace ();
        }
    }

    static int N = 3;
    static CounterThread[] ct = new CounterThread[N];

    private static void testMitLock () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock();

        runTest ();
    }

    private static void testMitLock2 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock2 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock2();

        runTest ();
    }

    private static void testMitLock3 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock3 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock3();

        runTest ();
    }

    private static void testMitLock4 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock4 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock4();

        runTest ();
    }

    private static void testMitLock5 () throws InterruptedException
    {
        System.out.println("--- Counter Test MIT Lock5 ----------------------");
        System.out.println("Testinstanzen erzeugen...");
        for (int i=0; i<N; i++)
            ct[i] = new CounterThreadMitLock5();

        runTest ();
    }

    private static void runTest () throws InterruptedException
    {
        System.out.println("Datenbank initialisieren...");
        ct[0].ctr.initDatenbank();

        System.out.println("Test durchführen...");
        for (int i=0; i<N; i++)
            ct[i].start();

        System.out.println("Auf Ende warten...");
        for (int i=0; i<N; i++)
            ct[i].join();

        System.out.println("Anzahl eingetragene Zeilen: " + ct[0].ctr.anzTestZeilen());
    }

    private static CounterTestRemote verbinden() {
        try {
            Properties p = new Properties();
            p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhost:1099");
            Context ctx = new InitialContext(p);

            Object ref = ctx.lookup("CounterTestBean/remote");
            CounterTestRemote ctr = (CounterTestRemote) PortableRemoteObject.narrow(ref, CounterTestRemote.class);

            return ctr;
        } catch (NamingException e) {
            System.out.println("ERROR - NamingException!");
            System.exit(-1);
        }
        return null;
    }

    public abstract static class CounterThread extends Thread
    {
        protected CounterTestRemote ctr;

        public CounterThread ()
        {
            this.ctr = verbinden ();
        }

        public void run() {
            for (int i = 0; i < 500; i++)
                test ();
        }

        public abstract void test ();
    }

    public static class CounterThreadMitLock extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock();
        }

    }

    public static class CounterThreadMitLock2 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock2();
        }

    }

    public static class CounterThreadMitLock3 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock3();
        }

    }

    public static class CounterThreadMitLock4 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock4();
        }

    }

    public static class CounterThreadMitLock5 extends CounterThread
    {
        @Override
        public void test ()
        {
            this.ctr.testCounterMitLock5();
        }

    }
}

Ответы [ 4 ]

2 голосов
/ 31 марта 2010

Поскольку ни один из режимов блокировки не работал, я попробовал решение ewernli с руководством SELECT ... FOR UPDATE. Это дало интересное исключение: «Неожиданный токен FOR». Поэтому я посмотрел на базу данных.

JBoss устанавливается с Hypersonic 1.8 (HSQLDB) по умолчанию, который не поддерживает блокировку строк. Уважаемые разработчики JBoss: реализация JPA должна выдавать исключение, когда режим блокировки не поддерживается.

Поэтому я добавил источник данных Oracle и изменил свой файл persistence.xml. После этого работают два теста:

        CounterTestVersion ct = manager.find(CounterTestVersion.class, 1, LockModeType.PESSIMISTIC_WRITE);
        int wert = ct.getWert();

и

    Query query = manager.createNativeQuery ("select * from COUNTER_TEST where NR = 1 for update", CounterTestVersion.class);
    CounterTestVersion ct = (CounterTestVersion)query.getSingleResult ();
    int wert = ct.getWert ()+1;

что интересно. Это должно работать с LockModeType.PESSIMISTIC_FORCE_INCREMENT тоже. В этом случае я вижу эту ошибку в журнале:

ORA-00054: resource busy and acquire with NOWAIT specified

Это происходит при звонке manager.find(). Я не могу понять, почему они ведут себя по-разному в фазе нагрузки. Возможно, ошибка в JBoss или Hibernate.

1 голос
/ 31 марта 2010

У меня есть несколько замечаний:

  • В настоящее время вы используете оптимистическую блокировку, но я не вижу поля @Version в вашей сущности. Я не думаю, что это может сработать.
  • Если вы хотите увеличить счетчик в 1500 раз, не используйте оптимистическую блокировку (вы не хотите, чтобы обновление отклонялось с OptimisticLockingException), но пессимистическую блокировку.
  • Глотание Throwable действительно неправильно, вы хотите, чтобы контейнер выполнял свою работу (но я думаю, вы это знаете).

Итак, здесь я бы использовал это вместо:

manager.lock(ct, LockModeType.READ);

И удалите catch (Throwable t).

Обновление: Я не могу проверить это прямо сейчас, но я бы использовал что-то вроде этого (остальная часть кода не изменилась):

public boolean testCounterWithLock() {
    CounterTest ct = manager.find(CounterTest.class, 1);
    manager.lock(ct, LockModeType.READ);
    int counter = ct.getCounter();
    ct.setCounter(counter + 1);
    manager.flush();
    return true;
}

Я действительно сомневаюсь, что это сработает. Прежде всего, блокировка чтения не мешает другим потокам обновлять строку. Во-вторых, другой поток может обновить строку между find () и getCounter ()

Вы правы, я пошел слишком быстро, и вышесказанное, безусловно, не является решением, и @ewernli также прав, JPA 1.0 не имеет поддержки стратегий пессимистичной блокировки, вам придется полагаться на базу данных для этого ( и использовать семантику SELECT FOR UPDATE). Каким-то образом мне удалось забыть об этом и сильно путать с режимом чтения. Виноват. Спасибо за указание на это.

Я думаю, что вы должны использовать LockModeType.WRITE, но, возможно, вы можете использовать em.refresh () после lock (), чтобы убедиться, что объект не устарел?

При использовании LockModeType.WRITE поле сущности, обозначенное @Version, добавляется в предложение WHERE UPDATE, а проверка параллелизма выполняется во время UPDATE:

UPDATE COUNTERTEST SET COUNTER = ?, OPT_LOCK = ? 
WHERE ((ID = ?) AND (OPT_LOCK = ?))

Если предложение WHERE не соответствует записи (поскольку другой поток уже обновил сущность), тогда поставщик сохраняемости выдаст OptimisticLockException.

Другими словами, обновление сущности после lock() ничего не изменит, другой поток все еще может очистить ту же сущность, в то время как другой изменяет счетчик. Единственный способ автоматизировать оптимистическую блокировку - это реализовать механизм повторных попыток.

Но когда flush() выдает PersitenceException (за исключением случаев NoResultException и NonUniqueResultException), текущая транзакция помечается для отката и, следовательно, не может быть использован для транзакционных целей. Таким образом, каждая повторная попытка должна быть выполнена с использованием новой транзакции. В bean-компоненте без учета статических данных вы можете делать рекурсивные удаленные вызовы, но я не думаю, что это имеет смысл в bean-компоненте с отслеживанием состояния, поэтому вам придется обрабатывать его со стороны клиента.

В конце, и это не очень удовлетворительно, на мой взгляд, менее худший способ справиться с этим в JPA 1.0 - получить блокировку с помощью SELECT FOR UPDATE.

1 голос
/ 31 марта 2010

Даже с LockModeType.READ или LockModeType.WRITE, JPA 1.0 поддерживает только оптимистическую блокировку . Получение блокировок все еще может быть отложено до времени фиксации, следовательно, проблема, с которой вы столкнулись.

С Параллелизм и блокировка JPA 2.0 :

PA 1.0 поддерживает только оптимистическое чтение или оптимистическая блокировка записи. JPA 2.0 поддерживает оптимистичный и пессимистичный замок

Другие ресурсы: Производительность EJB3 и Пессимистская блокировка с JPA

Чтобы иметь реальную пессимистическую блокировку с JPA 1.0, вам нужно полагаться на базу данных или расширение, специфичное для реализации. E.g.:

JPA 2.0 (Нечто подобное возможно с Hibernate API)

Account acc = em.find( Account.class, id, PESSIMISTIC );

JPA 1,0

Query query = em.createNativeQuery("SELECT * ... FOR UPDATE"); // works with most db
Account acc = (Account) query.getSingleResult();

По крайней мере, это то, что я наконец-то использовал, потому что lock не сработал, как ожидалось.

(Примечание. Вы также можете реализовать логику повторных попыток, когда возникает оптимистическое исключение. Но это сложно, потому что транзакции управляются сервером приложений. Вам нужно будет использовать @TRANSACTION_NEW, чтобы приостановить текущую транзакцию и запустить новую один и т. д. ... слишком сложно, я думаю!)

0 голосов
/ 31 марта 2010

Вы не показываете, что делаете с возвращаемым значением из testCounterWithLock. Я предполагаю, что вы получаете ошибки оптимистической блокировки, а возвращаемое значение иногда ложно.

Оптимальная блокировка - разумная модель, когда столкновения на практике могут быть нечастыми, а вызывающая сторона может разумно повторить работу. Поэтому, если вы получаете оптимистичные ошибки, вы можете просто повторить попытку.

В качестве альтернативы, используйте модель пессимистической блокировки, которая блокирует строку в базе данных в точке, которую вы прочитали. Вы можете сделать это, добавив LockMode of Pessimistic к вашему вызову find (). Использование пессимистической блокировки должно осуществляться с осторожностью, слишком легко получить плохой параллелизм и / или взаимные блокировки.

...