Как я могу написать семафор на Java, который отдает приоритет предыдущим успешным заявителям? - PullRequest
11 голосов
/ 05 марта 2011

Мне нужен объект семафора с одним разрешением в моей Java-программе, где есть дополнительный метод получения, который выглядит следующим образом:

boolean tryAcquire(int id)

и ведет себя следующим образом: если идентификатор не былвстречались раньше, затем запомните это, а затем просто делайте то, что делает java.util.concurrent.Semaphore.Если с идентификатором было , которое встречалось до и , то это привело к аренде разрешения, тогда присвойте этому потоку приоритет над всеми другими потоками, которые могут ожидать разрешения.Я также хочу дополнительный метод выпуска, такой как:

void release(int id)

, который делает то, что делает java.util.concurrent.Semaphore, плюс также "забывает" об идентификаторе.

Я действительно не знаюкак подойти к этому, но вот начало возможной реализации, но я боюсь, что это ни к чему не приведет:

public final class SemaphoreWithMemory {

    private final Semaphore semaphore = new Semaphore(1, true);
    private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>();

    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }

    public synchronized boolean tryAcquire(int id) {
        if (!favoured.contains(id)) {
            boolean gotIt = tryAcquire();
            if (gotIt) {
                favoured.add(id);
                return true;
            }
            else {
                return false;
            }
        }
        else {
            // what do I do here???
        }
    }

    public void release() {
        semaphore.release();
    }

    public synchronized void release(int id) {
        favoured.remove(id);
        semaphore.release();
    }

}

Ответы [ 5 ]

1 голос
/ 06 сентября 2011

Для блокирования модели сбора, что по этому поводу:

public class SemWithPreferred {
    int max;
    int avail;
    int preferredThreads;

    public SemWithPreferred(int max, int avail) {
        this.max = max;
        this.avail = avail;
    }

    synchronized public void get(int id) throws InterruptedException {
        boolean thisThreadIsPreferred = idHasBeenServedSuccessfullyBefore(id);
        if (thisThreadIsPreferred) {
            preferredThreads++;
        }
        while (! (avail > 0 && (preferredThreads == 0 || thisThreadIsPreferred))) {
            wait();
        }
        System.out.println(String.format("granted, id = %d, preferredThreads = %d", id, preferredThreads));
        avail -= 1;
        if (thisThreadIsPreferred) {
            preferredThreads--;
            notifyAll(); // removal of preferred thread could affect other threads' wait predicate
        }
    }

    synchronized public void put() {
        if (avail < max) {
            avail += 1;
            notifyAll();
        }
    }

    boolean idHasBeenServedSuccessfullyBefore(int id) {
        // stubbed out, this just treats any id that is a 
        // multiple of 5 as having been served successfully before 
        return id % 5 == 0;
    }
}
1 голос
/ 05 марта 2011

EDIT:
Сделал некоторый эксперимент. Пожалуйста, смотрите этот ответ для результатов.

В принципе, у Semaphore есть внутренняя очередь потоков, поэтому, как сказал Эндрю, если вы сделаете эту очередь приоритетной и выполните опрос из этой очереди, чтобы выдать разрешения, он, вероятно, будет вести себя так, как вы хотите. Обратите внимание, что вы не можете сделать это с tryAcquire, потому что таким образом потоки не стоят в очереди. Из того, что я вижу, похоже, что для этого нужно взломать класс AbstractQueuedSynchronizer .

Я мог бы также подумать о вероятностном подходе, как это:
(Я не говорю, что приведенный ниже код был бы хорошей идеей! Просто мозговая атака здесь.)

public class SemaphoreWithMemory {

    private final Semaphore semaphore = new Semaphore(1);
    private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>();
    private final ThreadLocal<Random> rng = //some good rng

    public boolean tryAcquire() {
        for(int i=0; i<8; i++){
            Thread.yield();
            // Tend to waste more time than tryAcquire(int id)
            // would waste.
            if(rng.get().nextDouble() < 0.3){
                return semaphore.tryAcquire();
            }
        }
        return semaphore.tryAcquire();
    }

    public boolean tryAcquire(int id) {
        if (!favoured.contains(id)) {
            boolean gotIt = semaphore.tryAcquire();
            if (gotIt) {
                favoured.add(id);
                return true;
            } else {
                return false;
            }
        } else {
            return tryAquire();
    }
}

Или "избранные" темы тусуются чуть дольше, как это:
РЕДАКТИРОВАТЬ: Оказывается, это была очень плохая идея (как с честным, так и с несправедливым семафором) (подробности см. В моем эксперименте .

    public boolean tryAcquire(int id) {
        if (!favoured.contains(id)) {
            boolean gotIt = semaphore.tryAcquire(5,TimeUnit.MILLISECONDS);
            if (gotIt) {
                favoured.add(id);
                return true;
            } else {
                return false;
            }
        } else {
            return tryAquire();
    }

Полагаю, таким образом вы можете изменить порядок выдачи разрешений, хотя это будет нечестно. Хотя с этим кодом вы, вероятно, будете тратить много времени на производительность ...

0 голосов
/ 06 марта 2011

Мне кажется, что самый простой способ сделать это - не пытаться комбинировать семафоры, а создавать их с нуля поверх мониторов.Это обычно рискованно, но в этом случае, поскольку в java.util.concurrent нет хороших строительных блоков, это самый простой способ сделать это.

Вот что я придумал:

public class SemaphoreWithMemory {

    private final Set<Integer> favouredIDs = new HashSet<Integer>();
    private final Object favouredLock = new Object();
    private final Object ordinaryLock = new Object();
    private boolean available = true;
    private int favouredWaiting = 0;

    /**
    Acquires the permit. Blocks until the permit is acquired.
    */
    public void acquire(int id) throws InterruptedException {
        Object lock;
        boolean favoured = false;

        synchronized (this) {
            // fast exit for uncontended lock
            if (available) {
                doAcquire(favoured, id);
                return;
            }
            favoured = favouredIDs.contains(id);
            if (favoured) {
                lock = favouredLock;
                ++favouredWaiting;
            }
            else {
                lock = ordinaryLock;
            }
        }

        while (true) {
            synchronized (this) {
                if (available) {
                    doAcquire(favoured, id);
                    return;
                }
            }
            synchronized (lock) {
                lock.wait();
            }
        }
    }

    private void doAcquire(boolean favoured, int id) {
        available = false;
        if (favoured) --favouredWaiting;
        else favouredIDs.add(id);
    }

    /**
    Releases the permit.
    */
    public synchronized void release() {
        available = true;
        Object lock = (favouredWaiting > 0) ? favouredLock : ordinaryLock;
        synchronized (lock) {
            lock.notify();
        }
    }

}
0 голосов
/ 06 марта 2011

Я прочитал эту статью Ceki и был заинтересован в том, каким может быть получение смещенного семафора (поскольку я чувствовал, что поведение "смещенной блокировки" будет иметь смысл и в семафорах ...).На моем оборудовании с двумя процессорами и Sun JVM 1.6 это фактически приводит к довольно равномерной аренде.

В любом случае, я также пытался «сместить» аренду семафора с помощью стратегии, которую я написал в другом ответе.Получается, что простое дополнительное выражение yield приводит к значительному смещению.Ваша проблема более сложна, но, возможно, вы можете провести аналогичные тесты с вашей идеей и посмотреть, что вы получите:)

ПРИМЕЧАНИЕ Код ниже основан на коде Ceki здесь

Код:

import java.util.concurrent.*;

public class BiasedSemaphore implements Runnable {
    static ThreadLocal<Boolean> favored = new ThreadLocal<Boolean>(){
        private boolean gaveOut = false;
        public synchronized Boolean initialValue(){
            if(!gaveOut){
                System.out.println("Favored " + Thread.currentThread().getName());
                gaveOut = true;
                return true;
            }
            return false;
        }
    };

    static int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
    static Semaphore SEM = new Semaphore(1);
    static Runnable[] RUNNABLE_ARRAY = new Runnable[THREAD_COUNT];
    static Thread[] THREAD_ARRAY = new Thread[THREAD_COUNT];

    private int counter = 0;

    public static void main(String args[]) throws InterruptedException {
        printEnvironmentInfo();
        execute();
        printResults();
    }

    public static void printEnvironmentInfo() {
        System.out.println("java.runtime.version = "
                + System.getProperty("java.runtime.version"));
        System.out.println("java.vendor          = "
                + System.getProperty("java.vendor"));
        System.out.println("java.version         = "
                + System.getProperty("java.version"));
        System.out.println("os.name              = "
                + System.getProperty("os.name"));
        System.out.println("os.version           = "
                + System.getProperty("os.version"));
    }

    public static void execute() throws InterruptedException {
        for (int i = 0; i < THREAD_COUNT; i++) {
            RUNNABLE_ARRAY[i] = new BiasedSemaphore();
            THREAD_ARRAY[i] = new Thread(RUNNABLE_ARRAY[i]);
            System.out.println("Runnable at "+i + " operated with "+THREAD_ARRAY[i]);
        }

        for (Thread t : THREAD_ARRAY) {
            t.start();
        }
        // let the threads run for a while
        Thread.sleep(10000);

        for (int i = 0; i< THREAD_COUNT; i++) {
            THREAD_ARRAY[i].interrupt();
        }

        for (Thread t : THREAD_ARRAY) {
            t.join();
        }
    }

    public static void printResults() {
        System.out.println("Ran with " + THREAD_COUNT + " threads");
        for (int i = 0; i < RUNNABLE_ARRAY.length; i++) {
            System.out.println("runnable[" + i + "]: " + RUNNABLE_ARRAY[i]);
        }
    }


    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            if (favored.get()) {
                stuff();
            } else {
                Thread.yield();
//                try {
//                    Thread.sleep(1);
//                } catch (InterruptedException e) {
//                    Thread.currentThread().interrupt();
//                }
                stuff();
            }
        }
    }

    private void stuff() {
        if (SEM.tryAcquire()) {
            //favored.set(true);
            counter++;
            try {
                Thread.sleep(10);
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
            SEM.release();
        } else {
            //favored.set(false);
        }
    }

    public String toString() {
        return "counter=" + counter;
    }
}

Результаты:

java.runtime.version = 1.6.0_21-b07
java.vendor          = Sun Microsystems Inc.
java.version         = 1.6.0_21
os.name              = Windows Vista
os.version           = 6.0
Runnable at 0 operated with Thread[Thread-0,5,main]
Runnable at 1 operated with Thread[Thread-1,5,main]
Favored Thread-0
Ran with 2 threads
runnable[0]: counter=503
runnable[1]: counter=425

Пробовал 30 секунд вместо 10:

java.runtime.version = 1.6.0_21-b07
java.vendor          = Sun Microsystems Inc.
java.version         = 1.6.0_21
os.name              = Windows Vista
os.version           = 6.0
Runnable at 0 operated with Thread[Thread-0,5,main]
Runnable at 1 operated with Thread[Thread-1,5,main]
Favored Thread-1
Ran with 2 threads
runnable[0]: counter=1274
runnable[1]: counter=1496

PS: Похоже, "тусоваться" было очень плохой идеей.Когда я попытался вызвать SEM.tryAcquire(1,TimeUnit.MILLISECONDS); для избранных тем и SEM.tryAcquire() для непривилегированных тем, непривилегированные потоки получили разрешение почти в 5 раз больше, чем избранная тема!

Кроме того, я хотел бы добавитьчто эти результаты измеряются только в одной конкретной ситуации, поэтому неясно, как эти меры ведут себя в других ситуациях.

0 голосов
/ 06 марта 2011

Предполагая, что вы хотите, чтобы потоки ожидали, я взломал решение, которое не является идеальным, но оно должно подойти.

Идея состоит в том, чтобы иметь два семафора и флаг "фаворита ждет".

Каждый поток, который пытается получить SemaphoreWithMemory, сначала пытается получить «favouredSemaphore».«Избранная» ветка хранит семафор, а не избранная немедленно его выпускает.Таким образом, избранный поток блокирует все остальные входящие потоки, как только он приобрел этот семафор.

Затем должен быть получен второй «обычный семафор», чтобы завершить работу.Но непривилегированный поток затем снова проверяет, нет ли предпочтительного потока, ожидающего использование переменной volatile).Если никто не ждет, он просто продолжает;если кто-то ждет, он освобождает normalSemaphore и рекурсивно вызывает повторное получение вызовов.

Я не совсем уверен, что скрывающихся условий гонки нет.Если вы хотите быть уверены, вам, возможно, следует провести рефакторинг своего кода для передачи «рабочих элементов» в очередь с приоритетами, где другой поток получает рабочий элемент с наивысшим приоритетом и выполняет этот код.

public final class SemaphoreWithMemory {

private volatile boolean favouredAquired = false;
private final Semaphore favouredSemaphore = new Semaphore(1, true);
private final Semaphore normalSemaphore = new Semaphore(1, true);
private final Set<Integer> favoured = new ConcurrentSkipListSet<Integer>();

public void acquire() throws InterruptedException {
    normalSemaphore.acquire();
}

public void acquire(int id) throws InterruptedException {
    boolean idIsFavoured = favoured.contains(id);
    favouredSemaphore.acquire();
    if (!idIsFavoured) {
        favouredSemaphore.release();
    } else {
        favouredAquired = true;
    }
    normalSemaphore.acquire();

    // check again that there is no favoured thread waiting
    if (!idIsFavoured) {
        if (favouredAquired) {
            normalSemaphore.release();
            acquire(); // starving probability!
        } else {
            favoured.add(id);
        }
    }

}

public void release() {
    normalSemaphore.release();
    if (favouredAquired) {
        favouredAquired = false;
        favouredSemaphore.release();
    }
}

public void release(int id) {
    favoured.remove(id);
    release();
}

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...