Как сообщить потоку PipedInputStream, что поток PipedOutputStream записал последний байт? - PullRequest
6 голосов
/ 29 марта 2012

Как правильно завершить работу на выходном конце трубы? Мне нужен поток записи, чтобы завершить или выполнить какую-то другую работу, пока поток чтения читает все записанные данные до конца.

Должен ли я закрыть трубу в конце записи или как?

ОБНОВЛЕНИЕ 1

Я хочу уточнить ... Правильно ли я считаю, что согласно приведенным ответам, поведение при проектировании каналов не предполагает какого-либо изящного завершения?

т.е. после открытия единственный способ остановить трубопровод - это разорвать трубу?

Обычные потоки ожидают окончания потокового сигнала, когда метод read() возвращает -1. Правильно ли думать, что это никогда не происходит с потоковыми каналами?

Ответы [ 3 ]

5 голосов
/ 06 апреля 2012

Да, закрытие PipedOutputStream приводит к -1 на PipedInputStream.

Выглядит довольно грациозно для меня!Вот мой SSCCE :

import java.io.*;
import java.nio.charset.*;

public class SOPipe
{
    public static void main(String[] args) throws Exception
    {
        PipedOutputStream os = new PipedOutputStream();
        PipedInputStream is = new PipedInputStream(os);

        ReaderThread readerThread = new ReaderThread(is);
        WriterThread writerThread = new WriterThread(os);

        readerThread.start();
        writerThread.start();

        readerThread.join();
        writerThread.join();

        System.out.println("Both Reader and Writer completed.");
        System.out.println("Main method returning normally now.");
    }

    private static final Charset LATIN1 = Charset.forName("latin1");

    public static class WriterThread extends Thread
    {
        private final PipedOutputStream _os;

        public WriterThread(PipedOutputStream os)
        {
            _os = os;
        }

        public void run()
        {
            try
            {
                String msg = "Ceci n'est pas une pipe";
                byte[] msgBytes = msg.getBytes(LATIN1);
                System.out.println("WriterThread sending message: " + msg);
                for(int i = 0; i < msgBytes.length; i++)
                {
                    _os.write(msgBytes, i, 1);
                    System.out.println("WriterThread wrote a byte!");
                    _os.flush();
                }
                _os.close();
                System.out.println("[COMPLETED] WriterThread");
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    public static class ReaderThread extends Thread
    {
        private final PipedInputStream _is;

        public ReaderThread(PipedInputStream is)
        {
            _is = is;
        }

        public void run()
        {
            try
            {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                byte[] buffer = new byte[1];
                int read;
                while ((read = _is.read(buffer, 0, 1)) != -1)
                {
                    System.out.println("ReaderThread read a byte!");
                    baos.write(buffer, 0, read);
                }
                System.out.println("[COMPLETED] ReaderThread; received: " 
                        + new String(baos.toByteArray(), LATIN1));
                _is.close();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }
}
4 голосов
/ 06 апреля 2012

Достаточно просто закрыть выходной поток.

0 голосов
/ 29 марта 2012

Вы можете расширить класс PipedOutputStream и переопределить его метод write(), чтобы добавить свою пользовательскую логику после записи всех байтов в поточный поток вывода.

    public class CustomPipedOutput extends PipedOutputStream {

          @Override
          public void write(byte[] byteArray, int offset, int length){

            super.write(byteArray, offset, length);

            //-- Code to be executed after writing bytes
          }

          @Override
          public void close(){

            super.close();

            //-- Code to be executed after closing piped input stream
          }

   }

Аналогично, можно расширить другиеметоды, если требуется соответственно.

...