Передача данных между потоками с помощью Java - PullRequest
7 голосов
/ 09 апреля 2011

Я пишу многопоточное приложение, имитирующее кинотеатр. Каждый вовлеченный человек имеет свой собственный поток, и параллелизм должен быть полностью сделан семафорами. Единственная проблема, с которой я сталкиваюсь, - как связать потоки, чтобы они могли общаться (например, через канал).

Например:

Клиент [1], который является потоком, приобретает семафор, который позволяет ему подойти к Кассе. Теперь Клиент [1] должен сообщить оператору кассы, что он хочет посмотреть фильм «Х». Затем BoxOfficeAgent [1], также являющийся потоком, должен убедиться, что фильм не заполнен, и либо продать билет, либо сказать Заказчику [1], что нужно выбрать другой фильм.

Как передать эти данные назад и вперед, сохраняя при этом параллелизм с семафорами?

Кроме того, единственный класс, который я могу использовать из java.util.concurrent, это семафор класс.

Ответы [ 2 ]

8 голосов
/ 09 апреля 2011

Один из простых способов передачи данных между потоками - использовать реализации интерфейса BlockingQueue<E>, расположенного в пакете java.util.concurrent.

В этих интерфейсах есть методы для добавления элементов в коллекцию с различным поведением:

  • add(E): добавляет, если возможно, иначе выдает исключение
  • boolean offer(E): возвращает true, если элемент был добавлен, иначе false
  • boolean offer(E, long, TimeUnit): пытается добавить элемент, ожидая указанное количество времени
  • put(E): блокирует вызывающий поток, пока элемент не будет добавлен

Он также определяет методы для поиска элементов с похожим поведением:

  • take(): блокировать, пока не появится элемент
  • poll(long, TimeUnit): извлекает элемент или возвращает ноль

Наиболее часто используемые реализации: ArrayBlockingQueue, LinkedBlockingQueue и SynchronousQueue.

Первый, ArrayBlockingQueue, имеет фиксированный размер, определяемый параметром, передаваемым его конструктору.

Второй, LinkedBlockingQueue, имеет неограниченный размер. Он всегда будет принимать любые элементы, то есть offer немедленно вернет true, add никогда не вызовет исключение.

Третий, и для меня самый интересный, SynchronousQueue, это точно труба. Вы можете думать об этом как о очереди с размером 0. Она никогда не сохранит элемент: эта очередь будет принимать элементы, только если есть какой-то другой поток, пытающийся извлечь элементы из него. И наоборот, операция поиска вернет элемент, только если другой поток пытается его выдвинуть.

Чтобы выполнить домашнее задание для синхронизации , выполненной исключительно с семафорами , вы можете вдохновиться описанием, которое я дал вам о SynchronousQueue, и написать нечто очень похожее:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Обратите внимание, что этот класс демонстрирует поведение, аналогичное тому, что я описал для SynchronousQueue.

Как только методы put(E) вызываются, он получает семафор записи, который останется пустым, так что другой вызов того же метода будет блокироваться в его первой строке. Затем этот метод сохраняет ссылку на передаваемый объект и освобождает семафор чтения. Этот выпуск позволит любому потоку, вызывающему метод take(), продолжить работу.

Первый шаг метода take(), естественно, состоит в том, чтобы получить семафор чтения, чтобы запретить любому другому потоку одновременно получать элемент. После того, как элемент был извлечен и сохранен в локальной переменной ( упражнение: что произойдет, если эта строка, E e = this.e, будет удалена? ), метод освобождает семафор записи, так что метод put(E) может быть вызван снова любым потоком и возвращает то, что было сохранено в локальной переменной.

В качестве важного замечания обратите внимание, что ссылка на передаваемый объект хранится в закрытом поле , а методы take() и put(E) оба final . Это чрезвычайно важно, и часто упускается. Если бы эти методы не были окончательными (или, что еще хуже, поле не было приватным), наследующий класс мог бы изменить поведение take() и put(E), нарушая контракт.

Наконец, вы можете избежать необходимости объявлять локальную переменную в методе take(), используя try {} finally {} следующим образом:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Здесь смысл этого примера, если просто показать использование try/finally, которое остается незамеченным среди неопытных разработчиков. Очевидно, что в этом случае нет реальной выгоды.

Черт, я в основном закончил твою домашнюю работу для тебя. Во время возмездия - и для вас, чтобы проверить свои знания о семафорах - почему бы вам не реализовать некоторые другие методы, определенные в контракте BlockingQueue? Например, вы можете реализовать метод offer(E) и take(E, long, TimeUnit)!

Удачи.

1 голос
/ 09 апреля 2011

Рассматривайте это с точки зрения общей памяти с блокировкой чтения / записи.

  1. Создать буфер для размещения сообщения.
  2. Доступ к буферу должен контролироваться с помощью блокировки / семафора.
  3. Используйте этот буфер для связи между потоками.

Привет

П

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...