Разблокировать блокировку из потока, которому он не принадлежит, или изменить дизайн, чтобы избежать этого? - PullRequest
3 голосов
/ 04 февраля 2010

У меня есть архивный объект, который управляет различными байтовыми массивами и раздает InputStream с и OutputStream с для их чтения и записи. Каждому байтовому массиву соответствует ReentrantReadWriteLock.

Конструктор подкласса InputStream, который создает архив, получает блокировку для соответствующих данных, тогда как close() снимает блокировку. Теперь проблема:

Предположим, что у меня есть задача, которая будет выполняться в другом потоке, который нуждается в вводе из архива, получает InputStream в качестве аргумента перед его запуском и отвечает за закрытие потока после его завершения. Эти требования несовместимы с механизмом блокировки, используемым моим архивом данных, потому что

  1. InputStream создается в потоке, отличном от того, в котором он закрыт.
  2. A ReentrantReadWriteLock должен быть освобожден потоком, которому он принадлежит.

Переписать части программы, которые ожидают ввода InputStream, чтобы избежать # 1, неудобно и делает эти части менее гибкими. Использование своего рода блокировки, которая разрешает смену владельца, позволило бы мне избежать # 2, но я не вижу каких-либо блокировок в Java API, которые могли бы справиться с этим, и я не очень заинтересован в том, чтобы собрать пользовательскую блокировку, если я Т должен.

Какие есть решения для этой проблемы?

Ответы [ 5 ]

4 голосов
/ 04 февраля 2010

Самый чистый способ сделать это - дать клиентам что-то, кроме подкласса InputStream, даже если это означает немного больший рефакторинг.В вашем текущем дизайне вы не можете рассчитывать на то, что ваши клиенты снимают блокировку, что если один из них этого не сделает?Это не очень хорошая инкапсуляция.

Пусть объект архива управляет блокировками и предоставляет своим клиентам простой API для получения данных .Даже если это означает изменение некоторых API.

Проблема блокировки разных потоков и освобождения одного и того же объекта - явный признак того, что в вашем текущем дизайне что-то не так.Работайте над вышеупомянутым рефакторингом, это поможет вам в дальнейшем.

2 голосов
/ 04 февраля 2010

На основании вашего ответа на мой вопрос. Вы можете просто отложить создание (и блокировку) InputStream, передав пользовательский класс, используемый для производства InputStream, в ThreadExecutorService. Затем поток в службе запрашивает InputStream у этого объекта, который сначала создает его (поскольку он не существует), а затем все готово.

Например:

public interface InputStreamMaker {
    public InputStream getOrCreateInputStream();
}

Тогда в вашем основном классе, который начинает все:

static class InputStreamMakerImpl implements InputStreamMaker {
    private final Manager manager; // Your manager class (or use a normal Inner class on manager)
    private InputStream is;

    // other variables needed to define how to create input stream for the particular task here

    InputStreamMakerImpl(Manager manager) {
        this.manager = manager;
    }

    public InputStream getOrCreateInputStream() {
         if (is == null) {
             // pass this object - so manager will have all the details to create the right stream
             is = manager.createNewStream(this);
         }

         return is;
    }
}

и затем в главном потоке ...

...some method...
InputStreamMakerImpl maker = new InputStreamMakerImpl(manager /* or this */);
// set values needed in maker for your manager class to create the right input stream
// start the worker and pass the InputStreamMaker (maker) as the parameter instead of the InputStream

Этот внутренний класс может иметь дополнительную информацию, необходимую для создания правильного потока для работника. Теперь InputStream создается / открывается в правом потоке, и этот поток также может закрыть его и разблокировать блокировку.

Редактировать : Перечитывая ваш вопрос, я вижу, что вы, возможно, не захотите изменять какие-либо объекты Worker, которые уже принимают InputStream. Тогда вы все еще можете сделать это:

  1. Создание пользовательского подкласса InputStream, который содержит все значения, которые я определил выше как InputStreamMakerImpl.

  2. В пользовательском InputStream есть внутренний метод getInputStream(), который выполняет то, что я определил для getOrCreateInputStream() выше.

  3. Тогда все методы для InputStream просто делегируют вызовы, например:

    public int read() throws IOException {
        return getInputStream().read();
    }
    
1 голос
/ 06 февраля 2010

java.io.InputStream имеет всего девять методов. Вы уже переопределили один метод, т.е. close().

Я бы порекомендовал вам ленивую блокировку, т.е. не агрессивно получать блокировку в конструкторе. Для этого сделайте ReentrantReadWriteLock доступным в вашем подклассе InputStream и получите закрытый метод с именем doAcquireLock(), который пытается получить блокировку. Затем переопределите все остальные восемь методов InputStream, так что во всех переопределенных методах сначала вызовите doAcquireLock(), а затем делегируйте операцию супер.

пример:

public int read() 
    doAcquireLock(); // This method get's the lock
    return super.read();
}

С этой блокировкой будет получен поток, который сначала вызывает любой из вышеупомянутых восьми методов и будет также отвечать за закрытие.

Просто чтобы убедиться, что вы не попали в тупиковую ситуацию, нужно следить за тем, чтобы клиент в конце вызывал метод close, чтобы снять блокировку. Кроме того, ни один из методов не должен вызываться после вызова метода close с использованием логического индикатора, чтобы узнать, если поток уже закрыт (устарел).

0 голосов
/ 09 февраля 2010

Другое решение, на которое я наткнулся, размышляя об этом: один из способов рассмотрения проблемы состоит в том, что она вызвана повторным входом замков. ReentrantReadWriteLock отслеживает владение замком для обработки повторного входа. Если я хочу получить блокировки при создании потока и снять блокировки при закрытии потока, а также закрыть потоки в потоках, отличных от тех, в которых они были созданы, то блокировка с повторным входом не будет работать, поскольку неправильный поток будет владеть блокировкой потока.

Одним из способов решения этой проблемы является использование счетной блокировки, но принудительная установка всей блокировки наружу. Во-первых, блокировка подсчета: Состояния являются целыми числами -1,0,1, ...., где -1 означает запись, 0 означает разблокировку, а положительные n подсчитывают количество одновременных операций чтения. Таким образом, записи по-прежнему являются эксклюзивными, но блокировки не зависят от потока. Вынуждение всех блокировок наружу: у нас есть методы типа exists(), где нам нужно получить блокировку чтения, но exists() вызывается некоторыми методами, которые уже имеют блокировку записи. Итак, мы разбили кишки exists() на защищенные existsImpl(), которые следует вызывать только тогда, когда блокировка уже удерживается (то есть внутренне, другими методами *Impl(). Таким образом, после получения блокировки Вызываются только разблокирующие методы Impl, и мы избегаем необходимости повторного входа.

Это все хорошо, за исключением того, что в конечном итоге он становится чрезвычайно сложным, чтобы гарантировать, что вы не облажаетесь при его реализации - вызывая открытый заблокированный метод, когда вы должны вызывать защищенный незафиксированный - - и тем самым создавая тупик. Во-вторых, навязывание всех ваших замков наружу означает, что вы дольше удерживаете эти замки, так что это (вероятно) не подходит для параллелизма.

Итак, я попробовал это, но решил не использовать.

0 голосов
/ 09 февраля 2010

Решение # 2:

Если вам не нужно удерживать блокировку до конца потока, вы можете реализовать свой пользовательский InputStream, как показано ниже, который вы можете дать своим клиентам. Теперь, когда любой поток должен прочитать, он получит блокировку, затем прочитает и, наконец, автоматически снимет блокировку, как только чтение будет завершено. Ваш пользовательский OutputStream должен быть аналогичным образом реализован с блокировкой записи.

.

public class LockingInputStream extends InputStream {
    private final InputStream byteArrayInputStream;
    private final Lock r;

    public LockingInputStream(
                    InputStream byteArrayInputStream, 
                    ReentrantReadWriteLock rwl) {
        this.byteArrayInputStream = byteArrayInputStream;
        this.r = rwl.readLock();
    }

    @Override
    public int read() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.read();
        } finally { r.unlock(); }
    }
    @Override
    public int available() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.available();
        } finally { r.unlock(); }
    }
    ....
    @Override
    public void close() throws IOException {
        r.lock();
        try {
            byteArrayInputStream.close();
        } finally { r.unlock(); }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...