Один из простых способов передачи данных между потоками - использовать реализации интерфейса BlockingQueue<E>
, расположенного в пакете java.util.concurrent
.
В этих интерфейсах есть методы для добавления элементов в коллекцию с различным поведением:
add(E)
: добавляет, если возможно, иначе выдает исключение
boolean offer(E)
: возвращает true, если элемент был добавлен, иначе false
boolean offer(E, long, TimeUnit)
: пытается добавить элемент, ожидая указанное количество времени
put(E)
: блокирует вызывающий поток, пока элемент не будет добавлен
Он также определяет методы для поиска элементов с похожим поведением:
take()
: блокировать, пока не появится элемент
poll(long, TimeUnit)
: извлекает элемент или возвращает ноль
Наиболее часто используемые реализации: ArrayBlockingQueue
, LinkedBlockingQueue
и SynchronousQueue
.
Первый, ArrayBlockingQueue
, имеет фиксированный размер, определяемый параметром, передаваемым его конструктору.
Второй, LinkedBlockingQueue
, имеет неограниченный размер. Он всегда будет принимать любые элементы, то есть offer
немедленно вернет true, add
никогда не вызовет исключение.
Третий, и для меня самый интересный, SynchronousQueue
, это точно труба. Вы можете думать об этом как о очереди с размером 0. Она никогда не сохранит элемент: эта очередь будет принимать элементы, только если есть какой-то другой поток, пытающийся извлечь элементы из него. И наоборот, операция поиска вернет элемент, только если другой поток пытается его выдвинуть.
Чтобы выполнить домашнее задание для синхронизации , выполненной исключительно с семафорами , вы можете вдохновиться описанием, которое я дал вам о SynchronousQueue, и написать нечто очень похожее:
class Pipe<E> {
private E e;
private final Semaphore read = new Semaphore(0);
private final Semaphore write = new Semaphore(1);
public final void put(final E e) {
write.acquire();
this.e = e;
read.release();
}
public final E take() {
read.acquire();
E e = this.e;
write.release();
return e;
}
}
Обратите внимание, что этот класс демонстрирует поведение, аналогичное тому, что я описал для SynchronousQueue.
Как только методы put(E)
вызываются, он получает семафор записи, который останется пустым, так что другой вызов того же метода будет блокироваться в его первой строке. Затем этот метод сохраняет ссылку на передаваемый объект и освобождает семафор чтения. Этот выпуск позволит любому потоку, вызывающему метод take()
, продолжить работу.
Первый шаг метода take()
, естественно, состоит в том, чтобы получить семафор чтения, чтобы запретить любому другому потоку одновременно получать элемент. После того, как элемент был извлечен и сохранен в локальной переменной ( упражнение: что произойдет, если эта строка, E e = this.e, будет удалена? ), метод освобождает семафор записи, так что метод put(E)
может быть вызван снова любым потоком и возвращает то, что было сохранено в локальной переменной.
В качестве важного замечания обратите внимание, что ссылка на передаваемый объект хранится в закрытом поле , а методы take()
и put(E)
оба final . Это чрезвычайно важно, и часто упускается. Если бы эти методы не были окончательными (или, что еще хуже, поле не было приватным), наследующий класс мог бы изменить поведение take()
и put(E)
, нарушая контракт.
Наконец, вы можете избежать необходимости объявлять локальную переменную в методе take()
, используя try {} finally {}
следующим образом:
class Pipe<E> {
// ...
public final E take() {
try {
read.acquire();
return e;
} finally {
write.release();
}
}
}
Здесь смысл этого примера, если просто показать использование try/finally
, которое остается незамеченным среди неопытных разработчиков. Очевидно, что в этом случае нет реальной выгоды.
Черт, я в основном закончил твою домашнюю работу для тебя. Во время возмездия - и для вас, чтобы проверить свои знания о семафорах - почему бы вам не реализовать некоторые другие методы, определенные в контракте BlockingQueue? Например, вы можете реализовать метод offer(E)
и take(E, long, TimeUnit)
!
Удачи.