Семафор, который выдает разрешения на основании уже выданных разрешений - PullRequest
1 голос
/ 02 мая 2020

У меня есть код, который вычисляет трехмерные поверхности путем разделения трехмерной области на блок и выполняет относительно дорогую операцию для каждого блока. Я пытаюсь распараллелить его, используя несколько рабочих потоков для обработки блоков. Важным условием является то, что нет двух рабочих, обрабатывающих смежные блоки одновременно.

Я добился хорошего уровня распараллеливания, разделив домен на восемь, при этом каждый работник обрабатывает 1/8 части домена и использует Семафор для контроля доступа в «опасную зону», где могут конфликтовать два работника.

static final Semaphore available = new Semaphore(1, true);
class BoxGenerator implements Runnable {
    ...
    void find_box(Box box) {
        if(in_danger_zone(box)) {
            try {
                available.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        do_expensive_action(box);

        if(in_danger_zone(box)) {
             available.release();
        }
    }
}

Я хотел бы иметь более разумный способ выдачи разрешений. Что-то вроде

for each current permit issued:
    if safe_to_proceed(current_permit,box) the OK
    else block

и, мы надеемся, использовать абстракцию более высокого уровня, например, семафор, вместо того, чтобы убедиться, что я получаю код более низкого уровня с правильным ожиданием () или Condition.await ().

A 2D разрешением может быть выдача разрешения, если x, y запрошенного элемента не находится в пределах 1 от координат любой текущей задачи.


Вот моя первая попытка, она решает проблему предотвращения конфликтов, но все блокировки снижают производительность.

public class PermitIssuer<T> {

    final BiPredicate<T,T> predicate;
    final Collection<T> current ; 
    Lock lock = new ReentrantLock();
    Condition hold = lock.newCondition();


    public PermitIssuer(BiPredicate<T, T> predicate,int size) {
        super();
        this.predicate = predicate;
        this.current = new ArrayBlockingQueue<T>(size);
    }

    boolean safeToIssuePermit(final T requester) {
        boolean safe = current.stream().allMatch(t -> predicate.test(t,requester));
        return safe;
    }

    public void aquire(final T requester) {
        while(true) {
            if(safeToIssuePermit(requester)) {
                current.add(requester);
                return;
            }
            lock.lock();
            try {
                hold.awaitUninterruptibly();
            } finally {
                lock.unlock();
            }
        }
    }

    public void release(final T requester) {
        current.remove(requester);
        lock.lock();
        try {
            hold.signalAll();
        } finally { 
            lock.unlock();
        }
    }
}
...