Как синхронизировать операции? - PullRequest
1 голос
/ 10 апреля 2011

У меня есть эта реализация для двухфазной блокировки. Проблема в том, что он отлично работает в большинстве сценариев, но не во всех. Я выяснил, что проблема возникла из-за моего использования Synchronized, который используется неправильно каким-то образом! Мне нужно, чтобы порядок операций был таким:

  1. Поток, разблокирующий любую блокировку, должен полностью завершить эту фазу (метод ul), прежде чем любой другой поток начнет блокировать такую ​​же блокировку.
  2. Поток, который ожидал блокировки, когда он просыпается, должен получить всех официантов этой блокировки, прежде чем он начнет выполнять свои операции.
  3. любые addEdge или removeEdge должны быть синхронизированы, потому что я использую один и тот же графовый объект со всеми потоками.

Вот код:

class LockTable{
     private HashMap<Integer,MyLock> locks;
     public LockTable(){
         locks= new HashMap<Integer,MyLock>();   
    }

/*******************************************************************************
 * Acquire a shared/read lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void rl (int tid, int oid) throws InterruptedException
{

    MyLock lock=null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock

            if((lock != null) && (lock.lock.isWriteLocked())){

               wait = true;
            }
            if(lock == null){
             getIt = true;
             lock = new MyLock(tid, true);
             lock.lock.readLock().lock();
             lock.readers.add(tid);
             locks.put(oid, lock);
          }

        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
    }//synch
    if (wait){
        synchronized(this){
            //System.out.println("Transaction " + tid + " is waiting..");
            lock.waiters.add(tid);
            lock.waitersType.add('s');
            Main.g.addEdge(tid, lock.tid);
            //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
        }//sync
         if(Main.g.hasCycle()){
            //System.out.println("Detect Cycle in rl..Executing Restart");
            restart(tid);
        }
        //to exclude the restarted thread
        if(!Main.trans[tid].terminate){
            lock.lock.readLock().lock();
            lock.readers.add(tid);
          synchronized(this){
            lock.waitersType.remove(lock.waiters.indexOf(tid));
            lock.waiters.remove(lock.waiters.indexOf(tid));
            }//sync
            for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
            }//for
        }//if terminate
        else
            return;
    }
   else
    if(!getIt){
              //System.out.println("Getting Shared Lock without Waiting");
              lock.lock.readLock().lock();
              lock.readers.add(tid);
              for(int i =0 ; i < lock.waiters.size();i++){
                if(lock.waitersType.get(i) == 'w'){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                    //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    }
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//if lock.waitersType
              } 
    }

} // rl

/*******************************************************************************
 * Acquire an exclusive/write lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void wl (int tid, int oid) throws InterruptedException
{
     //type to determine the last lock type in order
     //to be able to remove the edges from waitfor graph
    int type = 0;
    MyLock lock = null;
    boolean wait = false;
    boolean getIt = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);             // find the lock
            if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
            {
                wait = true;
            }
            if(lock == null){
                getIt = true;
                lock = new MyLock(tid);
                lock.lock.writeLock().lock();
                locks.put(oid,lock);
            }
        } catch(Exception e) {
            System.out.println(e.getStackTrace());        // lock not found, so oid is not locked;
        } // try
  }
     if (wait){
            //System.out.println("Transaction " + tid + " is waiting..");
           synchronized(this) {
            if(lock.lock.isWriteLocked()){
                Main.g.addEdge(tid, lock.tid);
                //System.out.println("Edge has been added "+tid + "==>" + lock.tid);
               }
            else{
                type = 1;
                for(int reader : lock.readers){
                     Main.g.addEdge(tid, reader);
                     //System.out.println("Edge has been added "+tid + "==>" + reader);
                }
                }//else
            if(Main.g.hasCycle()){
               //System.out.println("Detect Cycle");
               restart(tid);
             }//if
            //System.out.println("Graph: "+Main.g.toString());
         }//sync



       if(!Main.trans[tid].terminate){
            synchronized(this){
            lock.waiters.add(tid);
            lock.waitersType.add('w');
           }
            //System.out.println("I'm waiting here in wl");
            lock.lock.writeLock().lock();
            lock.tid = tid;
            //System.out.println("Wakeup..");
            synchronized(this){
                lock.waitersType.remove(lock.waiters.indexOf(tid));
                lock.waiters.remove(lock.waiters.indexOf(tid));
                //System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
           }//sync
                for(int i =0 ; i < lock.waiters.size();i++){
                    synchronized(Main.g){
                    Main.g.addEdge(lock.waiters.get(i), tid);
                   // System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                    //System.out.println("Graph: "+Main.g.toString());
                    }//sync
                    if(Main.g.hasCycle())
                        restart(lock.waiters.get(i));
                }//for
         }
        else
            return; 
    }// if(wait) ==> for the lock to be released
    else
        if(!getIt){
            lock.lock.writeLock().lock();
            lock.tid = tid;
            for(int i =0 ; i < lock.waiters.size();i++){
                synchronized(Main.g){
                Main.g.addEdge(lock.waiters.get(i), tid);
                //System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
                }
                if(Main.g.hasCycle())
                    restart(lock.waiters.get(i));
            }//for

     }

} // wl

void restart(int tid){
 synchronized(this) {
    Main.rollBack++;
    MyLock lock;
    List<Integer> toRemove = new ArrayList();
    for(int i : locks.keySet()){
       lock = locks.get(i);
       //for all the locks in the lock table delete the restarted thread from the waiters list
       if(lock.waiters.contains(tid)){
           lock.waitersType.remove(lock.waiters.indexOf(tid));
           lock.waiters.remove(lock.waiters.indexOf(tid));
        }

           //lock.sem.release();
           if(lock.lock.isWriteLockedByCurrentThread()){


               //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), lock.tid);
               //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
               lock.lock.writeLock().unlock();
                //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());
               toRemove.add(i);

           }
       if(!lock.lock.isWriteLocked())
           if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int j=0;j<lock.waiters.size();j++)
                      Main.g.removeEdge(lock.waiters.get(j), tid);
                   // lock.numberOfReaders --;
                  //System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
                  lock.readers.remove(lock.readers.indexOf(tid));
                  lock.lock.readLock().unlock();
                  //System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
                  toRemove.add(i);  

              }//if
    }//for
    Main.g.removeEdges(tid);
    Main.trans[tid].terminate = true;
    //System.out.println("Transaction" + tid + " restarted");

    }//sync
}

/*******************************************************************************
 * Unlock/release the lock on data object oid.
 * @param tid  the transaction id
 * @param oid  the data object id
 */
void ul (int tid, int oid)
{
   MyLock lock = null;
    boolean error = false;
    synchronized(this) {
        try {
            lock = locks.get(oid);                    // find the lock
            if( lock == null)
                System.out.println("println: lock not found");
        } catch(Exception e) {
            System.out.println("lock not found");   // lock not found
        } // try
    }
        if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
                  //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), lock.tid);
                      }//sync
                   //System.out.println("tid: " + tid + " unlock object: " + oid);
                   lock.lock.writeLock().unlock();
                  //print out 
                  //System.out.println("done with unlock");
                  //System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
                  //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                  //System.out.println("number of waiters: " + lock.lock.getQueueLength());
      }// if lock != null
        else
          if((lock != null) && (lock.readers.size()>0)){
              synchronized(this){
              if(lock.readers.contains(tid)){
                lock.readers.remove(lock.readers.indexOf(tid));
                }
                //remove the edges between the waiters of this lock and the thread that unlocked it
                  for(int i=0;i<lock.waiters.size();i++)
                      synchronized(Main.g){
                      Main.g.removeEdge(lock.waiters.get(i), tid);
                      }
                lock.lock.readLock().unlock();
                //System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
                //System.out.println("number of write holders: " + lock.lock.readLock().);
                //System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
                //System.out.println("number of waiters: " + lock.lock.getQueueLength());

              }//if lock.readers
          }//if
    if (error) 
        System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul

1 Ответ

1 голос
/ 10 апреля 2011

Есть несколько проблем с использованием synchronized.

  1. За исключением очень редких случаев, все права на чтение / запись (особенно запись) для общего объекта должны быть синхронизированы на одном и том же объекте. Вы синхронизируете на Main.g, когда пишете в него в некоторых местах, но есть места, где он не синхронизирован (например, внутри if(wait) метода wl).

  2. Насколько я понял цель вашего кода, состояние lock.{waiters|readers} должно быть синхронно с состоянием графа (Main.g). Это означает, что всякий раз, когда вы вносите изменения, скажем, waiters, а затем обновляете Main.g соответственно, эти две операции должны выполняться атомарно. Для этого вам нужно обернуть их в одно непрерывное выражение synchronized. Я вижу, что вы понимаете эту концепцию, потому что вы делаете это в некоторых местах, но затем вы, кажется, упускаете ее в других. Например: в методе rl внутри if(!getIt){ вы обновляете lock.readers, а затем Main.g без какой-либо синхронизации.

В целом, я не могу дать вам ничего конкретного, так как код довольно сложный, поэтому трудно сказать, каково его намерение. Но я думаю, что вы можете решить некоторые проблемы с блокировкой, блокируя большие разделы вашего кода. Например, вы можете просто синхронизировать целые методы rl, wl и ul на this, добавив ключевое слово synchronized в объявление метода, и избавиться от всей мелкозернистой блокировки внутри методов. Мелкозернистая блокировка может повысить производительность, но за счет высокой сложности. Я бы порекомендовал вам сначала начать с более простых схем блокировки, а затем постепенно улучшать их (если вам действительно это нужно!).

...