У меня есть код, который вычисляет трехмерные поверхности путем разделения трехмерной области на блок и выполняет относительно дорогую операцию для каждого блока. Я пытаюсь распараллелить его, используя несколько рабочих потоков для обработки блоков. Важным условием является то, что нет двух рабочих, обрабатывающих смежные блоки одновременно.
Я добился хорошего уровня распараллеливания, разделив домен на восемь, при этом каждый работник обрабатывает 1/8 части домена и использует Семафор для контроля доступа в «опасную зону», где могут конфликтовать два работника.
static final Semaphore available = new Semaphore(1, true);
class BoxGenerator implements Runnable {
...
void find_box(Box box) {
if(in_danger_zone(box)) {
try {
available.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
do_expensive_action(box);
if(in_danger_zone(box)) {
available.release();
}
}
}
Я хотел бы иметь более разумный способ выдачи разрешений. Что-то вроде
for each current permit issued:
if safe_to_proceed(current_permit,box) the OK
else block
и, мы надеемся, использовать абстракцию более высокого уровня, например, семафор, вместо того, чтобы убедиться, что я получаю код более низкого уровня с правильным ожиданием () или Condition.await ().
A 2D разрешением может быть выдача разрешения, если x, y запрошенного элемента не находится в пределах 1 от координат любой текущей задачи.
Вот моя первая попытка, она решает проблему предотвращения конфликтов, но все блокировки снижают производительность.
public class PermitIssuer<T> {
final BiPredicate<T,T> predicate;
final Collection<T> current ;
Lock lock = new ReentrantLock();
Condition hold = lock.newCondition();
public PermitIssuer(BiPredicate<T, T> predicate,int size) {
super();
this.predicate = predicate;
this.current = new ArrayBlockingQueue<T>(size);
}
boolean safeToIssuePermit(final T requester) {
boolean safe = current.stream().allMatch(t -> predicate.test(t,requester));
return safe;
}
public void aquire(final T requester) {
while(true) {
if(safeToIssuePermit(requester)) {
current.add(requester);
return;
}
lock.lock();
try {
hold.awaitUninterruptibly();
} finally {
lock.unlock();
}
}
}
public void release(final T requester) {
current.remove(requester);
lock.lock();
try {
hold.signalAll();
} finally {
lock.unlock();
}
}
}