Java: объединение InputStreams - PullRequest
4 голосов
/ 14 октября 2010

Моя цель - создать (или использовать существующую) реализацию InputStream (скажем, MergeInputStream), которая будет пытаться читать из нескольких InputStreams и возвращать первый результат. После этого он снимет блокировку и прекратит чтение из всех InputStreams до следующего вызова mergeInputStream.read (). Я был очень удивлен, что я не нашел такого инструмента. Дело в том, что все исходные InputStreams не совсем конечны (не файл, например, но System.in, сокет или тому подобное), поэтому я не могу использовать SequenceInputReader. Я понимаю, что для этого, вероятно, потребуется какой-то многопоточный механизм, но я абсолютно не знаю, как это сделать. Я пытался гуглить, но безрезультатно.

Ответы [ 2 ]

3 голосов
/ 14 октября 2010

Проблема чтения входных данных из нескольких источников и их сериализации в один поток предпочтительно решается с использованием SelectableChannel и Selector. Это, однако, требует, чтобы все источники могли предоставлять выбираемый канал. Это может или не может иметь место.

Если выбираемые каналы недоступны, вы можете решить это с помощью одного потока , разрешив реализации чтения выполнить следующее: Для каждого входного потока is, проверьте, если is.available() > 0, и если да, верните is.read(). Повторяйте эту процедуру, пока в некотором входном потоке не будут доступны данные.

Однако этот метод имеет два основных недостатка:

  1. Не все реализации из InputStream реализует available() таким образом, что он возвращает 0, если и только если read() заблокирует. В результате, естественно, данные не могут быть прочитаны из этого потока, даже если is.read() вернет значение. Вопрос о том, следует ли это рассматривать как ошибку, сомнителен, поскольку в документации просто говорится, что она должна возвращать «оценку» количества доступных байтов.

  2. Он использует так называемый "занятый цикл", который в основном означает, что вам нужно либо включить спящий цикл (что приводит к задержке чтения), либо излишне перегружать процессор.

Ваш третий вариант - иметь дело с блокирующими чтениями , порождающими один поток для каждого входного потока . Однако это потребует тщательной синхронизации и, возможно, некоторых накладных расходов, если у вас очень много входных потоков для чтения. Код ниже является первой попыткой решить эту проблему. Я ни в коем случае не уверен, что он достаточно синхронизирован или управляет потоками наилучшим образом.

import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class MergedInputStream extends InputStream {

    AtomicInteger openStreamCount;
    BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
    InputStream[] sources;

    public MergedInputStream(InputStream... sources) {
        this.sources = sources;
        openStreamCount = new AtomicInteger(sources.length);
        for (int i = 0; i < sources.length; i++)
            new ReadThread(i).start();
    }


    public void close() throws IOException {
        String ex = "";
        for (InputStream is : sources) {
            try {
                is.close();
            } catch (IOException e) {
                ex += e.getMessage() + " ";
            }
        }
        if (ex.length() > 0)
            throw new IOException(ex.substring(0, ex.length() - 1));
    }


    public int read() throws IOException {
        if (openStreamCount.get() == 0)
            return -1;

        try {
            return buf.take();
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }


    private class ReadThread extends Thread {

        private final int src;
        public ReadThread(int src) {
            this.src = src;
        }

        public void run() {
            try {
                int data;
                while ((data = sources[src].read()) != -1)
                    buf.put(data);
            } catch (IOException ioex) {
            } catch (InterruptedException e) {
            }
            openStreamCount.decrementAndGet();
        }
    }
}
1 голос
/ 14 октября 2010

Я могу придумать три способа сделать это:

  • Использовать неблокирующий ввод / вывод ( Документация API ).Это самое чистое решение.
  • Несколько потоков, по одному на каждый объединенный входной поток.Потоки блокируют метод read() соответствующего входного потока, а затем уведомляют объект MergeInputStream, когда данные становятся доступными.Метод read() в MergedInputStream будет ожидать этого уведомления, а затем считывать данные из соответствующего потока.
  • Один поток с занятым циклом.Ваши MergeInputStream.read() методы должны были бы проверять цикл available() метода каждого объединенного входного потока.Если данные отсутствуют, поспите несколько мс.Повторяйте, пока данные не станут доступны в одном из объединенных входных потоков.
...