Возвращаемое значение метода Java (PipedInputStream) () - PullRequest
4 голосов
/ 24 сентября 2011

Я пытаюсь написать кусок кода разблокировки для чтения из PipedInputStream.Он в основном проверяет, есть ли что-нибудь для чтения перед вызовом API блокирующего чтения:

int n = 0;
if ((n = pipedInputStream_.available()) > 0) {
     pipedInputStream_.read(...)
}

Чтение через Java API doc Я не могу точно сказать, какой должна быть эта проверка, посколькувозможные значения равны нулю (подразумевается отсутствие данных или закрыт / прерван поток) или больше нуля.Так как же вызывающий может узнать, есть ли что-нибудь для чтения вообще?

"Возвращает количество байтов, которые могут быть прочитаны из этого входного потока без блокировки, или 0, если этот входной поток былзакрывается, вызывая его метод close (), или если канал не подключен или сломан. "

Глядя на источник, кажется, что единственными значениями являются ноль или больше нуля.

public synchronized int available() throws IOException {
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
}

Ответы [ 2 ]

2 голосов
/ 24 сентября 2011

Если available() возвращает ноль, в настоящее время нет доступных байтов для чтения. Согласно приведенной вами документации, это может быть сделано по нескольким причинам:

  • Труба была закрыта.
  • Труба сломана.
  • Все ранее доступные входные данные (если есть) уже были использованы.

Нулевое возвращаемое значение из available() может означать, что произошла ошибка, подразумевая, что вы никогда не сможете читать больше данных через канал в будущем, но вы не можете сказать, наверняка здесь, потому что ноль может указывать на то третье условие выше, где блокировка на InputStream#read() может в конечном итоге дать больше данных, которые соответствующая сторона OutputStream протолкнет через канал.

Я не вижу возможности опроса PipedInputStream с available(), пока не станет доступно больше данных, потому что вы никогда не сможете отличить описанные выше случаи терминала (первый и второй) от считывателя быть более голодным, чем писатель. Как и многие другие потоковые интерфейсы, здесь вы также должны попытаться использовать и быть готовыми к сбою . Это ловушка; InputStream#read() будет блокироваться, но до тех пор, пока вы не совершите блокировку при попытке чтения, вы не сможете заметить, что больше нет ввода.

Невозможно основывать ваши потребительские действия на available(). Если он возвращает положительное число, нужно прочитать что-то , но, конечно, даже того, что доступно сейчас, может быть недостаточно для удовлетворения потребителя. Вам будет проще управлять вашим приложением, если вы выделите поток для использования InputStream в режиме блокировки и пропустите опрос с помощью available(). Пусть InputStream#read() будет вашим единственным оракулом здесь.

0 голосов
/ 18 октября 2013

Мне нужен был фильтр для перехвата медленных соединений, где мне нужно было как можно скорее закрыть соединения с БД, поэтому я сначала использовал каналы Java, но когда я посмотрел поближе на их реализацию, все было синхронизировано, поэтому я в итоге создал свой собственный QueueInputStream с использованием небольшого буфера и Очередь блокировки для помещения буфера в очередь, когда она была заполнена, блокируется, за исключением случаев, когда для условий блокировки, используемых в LinkedBlockingQueue, которые с помощью небольшого буфера должны быть дешевыми, этот класс предназначен для использования только для одного производитель и потребитель за экземпляр:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{-1};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

}
...