Почему больше кода Java не использует PipedInputStream / PipedOutputStream? - PullRequest
48 голосов
/ 27 января 2009

Я недавно обнаружил эту идиому, и мне интересно, есть ли что-то, что я пропускаю. Я никогда не видел, чтобы это использовалось. Почти весь Java-код, с которым я работал в дикой природе, способствует смещению данных в строку или буфер, а не как в этом примере (например, с использованием HttpClient и XML API):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

Этот код использует технику стиля Unix-piping, чтобы предотвратить сохранение нескольких копий данных XML в памяти. Он использует поток вывода HTTP Post и API загрузки / сохранения DOM для сериализации XML-документа в качестве содержимого HTTP-запроса. Насколько я могу сказать, это сводит к минимуму использование памяти с очень небольшим лишним кодом (всего несколько строк для Runnable, PipedInputStream и PipedOutputStream).

Итак, что не так с этой идиомой? Если в этой идиоме нет ничего плохого, почему я ее не видел?

РЕДАКТИРОВАТЬ: чтобы уточнить, PipedInputStream и PipedOutputStream заменяют шаблонную буферную копию, которая появляется везде, и они также позволяют обрабатывать входящие данные одновременно с записью обработанных данных. Они не используют каналы ОС.

Ответы [ 8 ]

44 голосов
/ 27 января 2009

Из Javadocs :

Как правило, данные считываются из объекта PipedInputStream одним потоком, а данные записываются в соответствующий PipedOutputStream другим потоком. Попытка использовать оба объекта из одного потока не рекомендуется, так как это может привести к блокировке потока.

Это может частично объяснить, почему оно не используется чаще.

Я бы предположил, что другая причина заключается в том, что многие разработчики не понимают его цели / выгоды.

7 голосов
/ 27 января 2009

В вашем примере вы создаете два потока для выполнения работы, которая может быть выполнена одним. И введение задержек ввода / вывода в миксе.

У вас есть лучший пример? Или я просто ответил на ваш вопрос.


Чтобы включить некоторые комментарии (по крайней мере, мой взгляд на них) в основной ответ:

  • Параллельность вносит сложность в приложение. Вместо того, чтобы иметь дело с одним линейным потоком данных, теперь вам нужно заботиться о последовательности независимых потоков данных. В некоторых случаях дополнительная сложность может быть оправдана, особенно если вы можете использовать несколько ядер / процессоров для выполнения работы, интенсивно использующей процессор.
  • Если вы находитесь в ситуации, когда вы можете извлечь выгоду из параллельных операций, обычно есть лучший способ скоординировать поток данных между потоками. Например, передача объектов между потоками с использованием параллельной очереди, а не упаковка потоков в потоках объектов.
  • Когда конвейерный поток может быть хорошим решением, когда несколько потоков выполняют обработку текста, например конвейер Unix (например, grep | sort).

В конкретном примере конвейерный поток позволяет использовать существующий класс реализации RequestEntity, предоставляемый HttpClient. Я считаю, что лучшим решением является создание нового класса реализации, как показано ниже, потому что в конечном итоге этот пример представляет собой последовательную операцию, которая не может извлечь выгоду из сложности и накладных расходов параллельной реализации. Хотя я показываю RequestEntity как анонимный класс, возможность повторного использования означает, что это должен быть класс первого класса.

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});
6 голосов
/ 27 января 2009

Я тоже только недавно обнаружил классы PipedInputStream / PipedOutputStream.

Я разрабатываю плагин Eclipse, который должен выполнять команды на удаленном сервере через SSH. Я использую JSch , и Channel API читает из входного потока и записывает в выходной поток. Но мне нужно передать команды через входной поток и прочитать ответы из выходного потока. Вот где приходит PipedInput / OutputStream.

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();
4 голосов
/ 27 января 2009

Кроме того, вернемся к исходному примеру: нет, он также точно не минимизирует использование памяти. Дерево (и) DOM построено, буферизация в памяти выполнена - хотя это лучше, чем реплики полных байтовых массивов, это не намного лучше. Но буферизация в этом случае будет медленнее; и создается дополнительный поток - вы не можете использовать пару PipedInput / OutputStream из одного потока.

Иногда PipedXxxStreams полезны, но причина, по которой они больше не используются, заключается в том, что довольно часто они не являются правильным решением. Они подходят для межпотокового общения, и именно здесь я использовал их для того, что это стоит. Просто для этого не так много вариантов использования, учитывая, как SOA устанавливает большинство таких границ между сервисами, а не между потоками.

2 голосов
/ 11 февраля 2016

Вот пример использования, где трубы имеют смысл:

Предположим, у вас есть сторонняя библиотека, такая как xslt mapper или crypto lib, которая имеет такой интерфейс: doSomething (inputStream, outputStream). И вы не хотите буферизовать результат перед отправкой по проводам. Apache и другие клиенты запрещают прямой доступ к потоковому выходному потоку. Самое близкое, что вы можете получить, - это получить выходной поток - со смещением после записи заголовков - в объекте объекта запроса. Но так как это скрыто, недостаточно передать входной поток и выходной поток сторонней библиотеке. Трубы являются хорошим решением этой проблемы.

Кстати, я написал инверсию API-клиента HTTP-клиента Apache [PipedApacheClientOutputStream] , который предоставляет интерфейс OutputStream для HTTP POST с использованием HTTP-клиента Apache Commons 4.3.4. Это пример, где Piped Streams может иметь смысл.

2 голосов
/ 27 января 2009

Я пытался использовать эти классы некоторое время назад, я забыл детали. Но я обнаружил, что их реализация смертельно несовершенна. Я не могу вспомнить, что это было, но у меня есть подлое воспоминание о том, что это могло быть состояние гонки, что означало, что они иногда зашли в тупик (И да, конечно, я использовал их в отдельных потоках: их просто нельзя использовать в один поток и не были предназначены для).

Я мог бы взглянуть на их исходный код и посмотреть, смогу ли я понять, в чем проблема.

1 голос
/ 19 октября 2013

В каналах java.io слишком много переключения контекста (чтение / запись на байт), а их аналог java.nio требует от вас некоторого фона NIO и правильного использования каналов и прочего, это моя собственная реализация каналов, использующая блокировку очередь, которая для одного производителя / потребителя будет работать быстро и хорошо масштабироваться:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
0 голосов
/ 27 января 2009

Итак, что не так с этой идиомой? Если нет ничего плохого в этой идиоме, почему я не видел это?

РЕДАКТИРОВАТЬ: уточнить, PipedInputStream и PipedOutputStream заменяет шаблонная буферная копия, появляется везде, и они также позволяют обрабатывать входящие данные одновременно с выписыванием обработанные данные. Они не используют ОС трубы.

Вы указали, что он делает, но не указали, почему вы это делаете.

Если вы считаете, что это либо сократит используемые ресурсы (процессор / память), либо улучшит производительность, это тоже не сработает. Однако это сделает ваш код более сложным.

По сути, у вас есть решение без проблемы, для которой оно решается.

...