Канал потока ввода и вывода в Java - PullRequest
9 голосов
/ 13 декабря 2008

Есть ли у кого-нибудь хорошие предложения для создания объекта Pipe в Java, который равен и InputStream, и и OutputStream, поскольку Java не имеет множественного наследования, и оба потока являются абстрактными классами вместо интерфейсов?

Основная потребность заключается в том, чтобы иметь один объект, который можно передать вещам, которым требуется либо InputStream, либо OutputStream для передачи вывода из одного потока в ввод для другого.

Ответы [ 6 ]

8 голосов
/ 14 декабря 2008

Кажется, смысл этого вопроса упущен. Если я вас правильно понимаю, вам нужен объект, который функционирует как InputStream в одном потоке и OutputStream в другом, чтобы создать средство связи между двумя потоками.

Возможно, один из ответов - использовать композицию вместо наследования (что в любом случае рекомендуется). Создайте канал, содержащий PipedInputStream и PipedOutputStream, связанные друг с другом, с помощью методов getInputStream () и getOutputStream ().

Вы не можете напрямую передать объект Pipe чему-либо, нуждающемуся в потоке, но вы можете передать возвращаемое значение метода get, чтобы сделать это.

Это работает для вас?

5 голосов
/ 13 декабря 2008

java.io.PipedOutputStream и java.io.PipedInputStream выглядят как классы для использования в этом сценарии. Они предназначены для совместного использования для передачи данных между потоками.

Если вы действительно хотите, чтобы какой-то один объект проходил вокруг, он должен содержать один из них и выставлять их через геттеры.

3 голосов
/ 13 декабря 2008

Думаю, это довольно распространенная вещь. Смотрите этот вопрос.

Простой способ записать содержимое Java InputStream в OutputStream

1 голос
/ 16 мая 2009
1 голос
/ 13 декабря 2008

Вы не можете создать класс, который является производным от InputStream и OutputStream, потому что они не являются интерфейсами и имеют общие методы, а Java не допускает множественное наследование (компилятор не знает, вызывать ли InputStream.close() или OutputStream.close(), если вы звоните close() на ваш новый объект).

Другая проблема - буфер. Java хочет выделить статический буфер для данных (который не изменяется). Это означает, что когда вы используете `java.io.PipedXxxStream ', запись данных в него в конечном итоге блокируется, если вы не используете два разных потока.

Так что ответ от Apocalisp правильный: вы должны написать цикл копирования.

Я предлагаю вам включить Apache commons-io в свой проект, который содержит множество вспомогательных процедур только для таких задач (копирование данных между потоками, файлами, строками и всеми их комбинациями).

0 голосов
/ 19 октября 2013

Мне пришлось реализовать фильтр для медленных соединений с сервлетами, поэтому в основном я обернул выходной поток сервлета в QueueOutputStream, который будет добавлять каждый байт (в небольших буферах) в очередь, а затем выводить эти небольшие буферы на второй выход stream, так что в некотором смысле это действует как поток ввода / вывода, IMHO, это лучше, чем каналы JDK, которые не будут так хорошо масштабироваться, в основном в стандартной реализации JDK слишком много переключений контекста (на чтение / запись), блокировка очередь просто идеальна для сценария одного производителя / потребителя:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...