Можно ли читать из InputStream с таймаутом? - PullRequest
139 голосов
/ 30 апреля 2009

В частности, проблема заключается в том, чтобы написать метод, подобный этому:

int maybeRead(InputStream in, long timeout)

, где возвращаемое значение совпадает с in.read (), если данные доступны в течение миллисекунд 'timeout', и -2 в противном случае. Перед возвратом метода все порожденные потоки должны завершиться.

Чтобы избежать аргументов, предмет здесь java.io.InputStream, как документировано Sun (любая версия Java). Обратите внимание, что это не так просто, как кажется. Ниже приведены некоторые факты, которые подтверждаются непосредственно документацией Sun.

  1. Метод in.read () может быть непрерывным.

  2. Обертывание InputStream в Reader или InterruptibleChannel не помогает, потому что все эти классы могут делать, это вызывать методы InputStream. Если бы можно было использовать эти классы, можно было бы написать решение, которое просто выполняет ту же логику непосредственно на InputStream.

  3. Для in.available () всегда допустимо возвращать 0.

  4. Метод in.close () может блокировать или ничего не делать.

  5. Нет общего способа убить другой поток.

Ответы [ 7 ]

77 голосов
/ 01 мая 2013

Использование inputStream.available ()

Для System.in.available () всегда допустимо возвращать 0.

Я обнаружил обратное - он всегда возвращает наилучшее значение для количества доступных байтов. Javadoc для InputStream.available():

Returns an estimate of the number of bytes that can be read (or skipped over) 
from this input stream without blocking by the next invocation of a method for 
this input stream.

Оценка неизбежна из-за сроков / устаревания. Эта цифра может быть разовой заниженной, поскольку постоянно поступают новые данные. Однако он всегда «догоняет» при следующем вызове - он должен учитывать все поступившие данные, за исключением того, что он поступил только в момент нового вызова. Постоянно возвращать 0, когда есть данные, не выполняется условие выше.

Первое предупреждение: за доступность отвечают конкретные подклассы InputStream ()

InputStream - абстрактный класс. У него нет источника данных. Это бессмысленно иметь доступные данные. Следовательно, javadoc для available() также гласит:

The available method for class InputStream always returns 0.

This method should be overridden by subclasses.

И действительно, конкретные классы входного потока переопределяют available (), предоставляя значимые значения, а не константы 0.

Второе предупреждение: убедитесь, что вы используете возврат каретки при вводе ввода в Windows.

При использовании System.in ваша программа получает ввод только тогда, когда командная оболочка передает его. Если вы используете перенаправление файлов / pipe (например, somefile> java myJavaApp или somecommand | java myJavaApp), то входные данные обычно передаются немедленно. Однако, если вы вводите вручную, передача данных может быть отложена. Например. В оболочке windows cmd.exe данные буферизуются в оболочке cmd.exe. Данные передаются только в исполняющую Java-программу после возврата каретки (control-m или <enter>). Это ограничение среды исполнения. Конечно, InputStream.available () будет возвращать 0 до тех пор, пока оболочка буферизует данные - это правильное поведение; на данный момент нет доступных данных. Как только данные доступны из оболочки, метод возвращает значение> 0. Примечание: Cygwin также использует cmd.exe.

Самое простое решение (без блокировки, поэтому время ожидания не требуется)

Просто используйте это:

    byte[] inputData = new byte[1024];
    int result = is.read(inputData, 0, is.available());  
    // result will indicate number of bytes read; -1 for EOF with no data read.

ИЛИ эквивалентно,

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in, Charset.forName("ISO-8859-1")),1024);
    // ...
         // inside some iteration / processing logic:
         if (br.ready()) {
             int readCount = br.read(inputData, bufferOffset, inputData.length-bufferOffset);
         }

Richer Solution (максимально заполняет буфер в течение периода ожидания)

Объявите это:

public static int readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis)
     throws IOException  {
     int bufferOffset = 0;
     long maxTimeMillis = System.currentTimeMillis() + timeoutMillis;
     while (System.currentTimeMillis() < maxTimeMillis && bufferOffset < b.length) {
         int readLength = java.lang.Math.min(is.available(),b.length-bufferOffset);
         // can alternatively use bufferedReader, guarded by isReady():
         int readResult = is.read(b, bufferOffset, readLength);
         if (readResult == -1) break;
         bufferOffset += readResult;
     }
     return bufferOffset;
 }

Тогда используйте это:

    byte[] inputData = new byte[1024];
    int readCount = readInputStreamWithTimeout(System.in, inputData, 6000);  // 6 second timeout
    // readCount will indicate number of bytes read; -1 for EOF with no data read.
64 голосов
/ 23 марта 2012

Предполагая, что ваш поток не поддерживается сокетом (поэтому вы не можете использовать Socket.setSoTimeout()), я думаю, что стандартный способ решения этого типа проблем - это использование Future.

Предположим, у меня есть следующий исполнитель и потоки:

    ExecutorService executor = Executors.newFixedThreadPool(2);
    final PipedOutputStream outputStream = new PipedOutputStream();
    final PipedInputStream inputStream = new PipedInputStream(outputStream);

У меня есть писатель, который пишет некоторые данные, затем ждет 5 секунд, прежде чем записать последний фрагмент данных и закрыть поток:

    Runnable writeTask = new Runnable() {
        @Override
        public void run() {
            try {
                outputStream.write(1);
                outputStream.write(2);
                Thread.sleep(5000);
                outputStream.write(3);
                outputStream.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    executor.submit(writeTask);

Обычный способ прочесть это следующим образом. Чтение блокируется на неопределенный срок для данных, и это завершается через 5 с:

    long start = currentTimeMillis();
    int readByte = 1;
    // Read data without timeout
    while (readByte >= 0) {
        readByte = inputStream.read();
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }
    System.out.println("Complete in " + (currentTimeMillis() - start) + "ms");

который выводит:

Read: 1
Read: 2
Read: 3
Complete in 5001ms

Если бы существовала более фундаментальная проблема, например, что писатель не отвечает, читатель заблокирует его навсегда. Если я заверну чтение в будущем, то смогу контролировать время ожидания следующим образом:

    int readByte = 1;
    // Read data with timeout
    Callable<Integer> readTask = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return inputStream.read();
        }
    };
    while (readByte >= 0) {
        Future<Integer> future = executor.submit(readTask);
        readByte = future.get(1000, TimeUnit.MILLISECONDS);
        if (readByte >= 0)
            System.out.println("Read: " + readByte);
    }

который выводит:

Read: 1
Read: 2
Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at test.InputStreamWithTimeoutTest.main(InputStreamWithTimeoutTest.java:74)

Я могу поймать исключение TimeoutException и выполнить любую необходимую очистку.

18 голосов
/ 20 октября 2011

Я бы усомнился в постановке проблемы, а не просто принял ее вслепую. Вам нужны только тайм-ауты из консоли или по сети. Если в последнем случае у вас есть Socket.setSoTimeout() и HttpURLConnection.setReadTimeout(), которые оба делают именно то, что требуется, при условии, что вы их правильно настроили, когда их строили / приобретали. Оставляя его на произвольном этапе позже в приложении, когда все, что у вас есть, это InputStream - плохой дизайн, приводящий к очень неловкой реализации.

18 голосов
/ 06 сентября 2011

Если ваш InputStream поддерживается Socket, вы можете установить время ожидания Socket (в миллисекундах), используя setSoTimeout . Если вызов read () не разблокируется в течение указанного времени, он выдаст исключение SocketTimeoutException.

Просто убедитесь, что вы вызываете setSoTimeout для Socket перед выполнением вызова read ().

7 голосов
/ 30 апреля 2009

Я не использовал классы из пакета Java NIO, но , похоже, , они могут быть здесь полезны. В частности, java.nio.channels.Channels и java.nio.channels.InterruptibleChannel .

5 голосов
/ 01 мая 2009

Вот способ получить NIO FileChannel из System.in и проверить доступность данных, используя тайм-аут, который является частным случаем проблемы, описанной в вопросе. Запустите его на консоли, не вводите никаких данных и ждите результатов. Он был успешно протестирован под Java 6 в Windows и Linux.

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;

public class Main {

    static final ByteBuffer buf = ByteBuffer.allocate(4096);

    public static void main(String[] args) {

        long timeout = 1000 * 5;

        try {
            InputStream in = extract(System.in);
            if (! (in instanceof FileInputStream))
                throw new RuntimeException(
                        "Could not extract a FileInputStream from STDIN.");

            try {
                int ret = maybeAvailable((FileInputStream)in, timeout);
                System.out.println(
                        Integer.toString(ret) + " bytes were read.");

            } finally {
                in.close();
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    /* unravels all layers of FilterInputStream wrappers to get to the
     * core InputStream
     */
    public static InputStream extract(InputStream in)
            throws NoSuchFieldException, IllegalAccessException {

        Field f = FilterInputStream.class.getDeclaredField("in");
        f.setAccessible(true);

        while( in instanceof FilterInputStream )
            in = (InputStream)f.get((FilterInputStream)in);

        return in;
    }

    /* Returns the number of bytes which could be read from the stream,
     * timing out after the specified number of milliseconds.
     * Returns 0 on timeout (because no bytes could be read)
     * and -1 for end of stream.
     */
    public static int maybeAvailable(final FileInputStream in, long timeout)
            throws IOException, InterruptedException {

        final int[] dataReady = {0};
        final IOException[] maybeException = {null};
        final Thread reader = new Thread() {
            public void run() {                
                try {
                    dataReady[0] = in.getChannel().read(buf);
                } catch (ClosedByInterruptException e) {
                    System.err.println("Reader interrupted.");
                } catch (IOException e) {
                    maybeException[0] = e;
                }
            }
        };

        Thread interruptor = new Thread() {
            public void run() {
                reader.interrupt();
            }
        };

        reader.start();
        for(;;) {

            reader.join(timeout);
            if (!reader.isAlive())
                break;

            interruptor.start();
            interruptor.join(1000);
            reader.join(1000);
            if (!reader.isAlive())
                break;

            System.err.println("We're hung");
            System.exit(1);
        }

        if ( maybeException[0] != null )
            throw maybeException[0];

        return dataReady[0];
    }
}

Интересно, что при запуске программы внутри NetBeans 6.5, а не в консоли время ожидания вообще не работает, и вызов System.exit () фактически необходим для уничтожения потоков зомби. Что происходит, так это то, что поток прерывания блокирует (!) Вызов функции reader.interrupt (). Другая тестовая программа (здесь не показана) дополнительно пытается закрыть канал, но это тоже не работает.

4 голосов
/ 30 апреля 2009

Как сказал jt, NIO - лучшее (и правильное) решение. Если вы действительно застряли с InputStream, вы можете либо

  1. Создать поток, чья эксклюзивная работа заключается в чтении из InputStream и помещении результата в буфер, который можно прочитать из вашего исходного потока без блокировки. Это должно работать хорошо, если у вас есть только один экземпляр потока. В противном случае вы можете убить поток, используя устаревшие методы в классе Thread, хотя это может привести к утечке ресурсов.

  2. Положитесь на isAvailable, чтобы указать данные, которые могут быть прочитаны без блокировки. Однако в некоторых случаях (например, в случае с Sockets) может потребоваться потенциально блокирующее чтение для isAvailable, чтобы сообщить что-то отличное от 0.

...