У меня есть эта реализация для двухфазной блокировки. Проблема в том, что он отлично работает в большинстве сценариев, но не во всех. Я выяснил, что проблема возникла из-за моего использования Synchronized, который используется неправильно каким-то образом! Мне нужно, чтобы порядок операций был таким:
- Поток, разблокирующий любую блокировку, должен полностью завершить эту фазу (метод ul), прежде чем любой другой поток начнет блокировать такую же блокировку.
- Поток, который ожидал блокировки, когда он просыпается, должен получить всех официантов этой блокировки, прежде чем он начнет выполнять свои операции.
- любые addEdge или removeEdge должны быть синхронизированы, потому что я использую один и тот же графовый объект со всеми потоками.
Вот код:
class LockTable{
private HashMap<Integer,MyLock> locks;
public LockTable(){
locks= new HashMap<Integer,MyLock>();
}
/*******************************************************************************
* Acquire a shared/read lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void rl (int tid, int oid) throws InterruptedException
{
MyLock lock=null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if((lock != null) && (lock.lock.isWriteLocked())){
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid, true);
lock.lock.readLock().lock();
lock.readers.add(tid);
locks.put(oid, lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}//synch
if (wait){
synchronized(this){
//System.out.println("Transaction " + tid + " is waiting..");
lock.waiters.add(tid);
lock.waitersType.add('s');
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}//sync
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle in rl..Executing Restart");
restart(tid);
}
//to exclude the restarted thread
if(!Main.trans[tid].terminate){
lock.lock.readLock().lock();
lock.readers.add(tid);
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}//for
}//if terminate
else
return;
}
else
if(!getIt){
//System.out.println("Getting Shared Lock without Waiting");
lock.lock.readLock().lock();
lock.readers.add(tid);
for(int i =0 ; i < lock.waiters.size();i++){
if(lock.waitersType.get(i) == 'w'){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//if lock.waitersType
}
}
} // rl
/*******************************************************************************
* Acquire an exclusive/write lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void wl (int tid, int oid) throws InterruptedException
{
//type to determine the last lock type in order
//to be able to remove the edges from waitfor graph
int type = 0;
MyLock lock = null;
boolean wait = false;
boolean getIt = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if(lock != null && (lock.lock.isWriteLocked() || lock.readers.size() > 0))
{
wait = true;
}
if(lock == null){
getIt = true;
lock = new MyLock(tid);
lock.lock.writeLock().lock();
locks.put(oid,lock);
}
} catch(Exception e) {
System.out.println(e.getStackTrace()); // lock not found, so oid is not locked;
} // try
}
if (wait){
//System.out.println("Transaction " + tid + " is waiting..");
synchronized(this) {
if(lock.lock.isWriteLocked()){
Main.g.addEdge(tid, lock.tid);
//System.out.println("Edge has been added "+tid + "==>" + lock.tid);
}
else{
type = 1;
for(int reader : lock.readers){
Main.g.addEdge(tid, reader);
//System.out.println("Edge has been added "+tid + "==>" + reader);
}
}//else
if(Main.g.hasCycle()){
//System.out.println("Detect Cycle");
restart(tid);
}//if
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(!Main.trans[tid].terminate){
synchronized(this){
lock.waiters.add(tid);
lock.waitersType.add('w');
}
//System.out.println("I'm waiting here in wl");
lock.lock.writeLock().lock();
lock.tid = tid;
//System.out.println("Wakeup..");
synchronized(this){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
//System.out.println("the number of waiters after wakeup: " + lock.waiters.size());
}//sync
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
// System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
//System.out.println("Graph: "+Main.g.toString());
}//sync
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
else
return;
}// if(wait) ==> for the lock to be released
else
if(!getIt){
lock.lock.writeLock().lock();
lock.tid = tid;
for(int i =0 ; i < lock.waiters.size();i++){
synchronized(Main.g){
Main.g.addEdge(lock.waiters.get(i), tid);
//System.out.println("Edge has been added "+lock.waiters.get(i) + "==>" + tid);
}
if(Main.g.hasCycle())
restart(lock.waiters.get(i));
}//for
}
} // wl
void restart(int tid){
synchronized(this) {
Main.rollBack++;
MyLock lock;
List<Integer> toRemove = new ArrayList();
for(int i : locks.keySet()){
lock = locks.get(i);
//for all the locks in the lock table delete the restarted thread from the waiters list
if(lock.waiters.contains(tid)){
lock.waitersType.remove(lock.waiters.indexOf(tid));
lock.waiters.remove(lock.waiters.indexOf(tid));
}
//lock.sem.release();
if(lock.lock.isWriteLockedByCurrentThread()){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), lock.tid);
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.lock.writeLock().unlock();
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}
if(!lock.lock.isWriteLocked())
if(lock.readers.contains(tid) && lock.lock.getReadLockCount()>0){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int j=0;j<lock.waiters.size();j++)
Main.g.removeEdge(lock.waiters.get(j), tid);
// lock.numberOfReaders --;
//System.out.println("Transaction"+tid+" unlock object "+ i +" in order to restart");
lock.readers.remove(lock.readers.indexOf(tid));
lock.lock.readLock().unlock();
//System.out.println("number of write holders: " + lock.lock.getWriteHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
toRemove.add(i);
}//if
}//for
Main.g.removeEdges(tid);
Main.trans[tid].terminate = true;
//System.out.println("Transaction" + tid + " restarted");
}//sync
}
/*******************************************************************************
* Unlock/release the lock on data object oid.
* @param tid the transaction id
* @param oid the data object id
*/
void ul (int tid, int oid)
{
MyLock lock = null;
boolean error = false;
synchronized(this) {
try {
lock = locks.get(oid); // find the lock
if( lock == null)
System.out.println("println: lock not found");
} catch(Exception e) {
System.out.println("lock not found"); // lock not found
} // try
}
if((lock != null) && (lock.lock.isWriteLockedByCurrentThread())){
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), lock.tid);
}//sync
//System.out.println("tid: " + tid + " unlock object: " + oid);
lock.lock.writeLock().unlock();
//print out
//System.out.println("done with unlock");
//System.out.println("number of write holders: " + lock.lock.writeLock().getHoldCount());
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}// if lock != null
else
if((lock != null) && (lock.readers.size()>0)){
synchronized(this){
if(lock.readers.contains(tid)){
lock.readers.remove(lock.readers.indexOf(tid));
}
//remove the edges between the waiters of this lock and the thread that unlocked it
for(int i=0;i<lock.waiters.size();i++)
synchronized(Main.g){
Main.g.removeEdge(lock.waiters.get(i), tid);
}
lock.lock.readLock().unlock();
//System.out.println("Transaction"+tid+" unlocked shared lock on object "+oid);
//System.out.println("number of write holders: " + lock.lock.readLock().);
//System.out.println("number of read holders: " + lock.lock.getReadHoldCount());
//System.out.println("number of waiters: " + lock.lock.getQueueLength());
}//if lock.readers
}//if
if (error)
System.out.println ("Error: ul: no lock for oid = " + oid + " found/owned");
} // ul