Проблема чтения входных данных из нескольких источников и их сериализации в один поток предпочтительно решается с использованием SelectableChannel
и Selector
. Это, однако, требует, чтобы все источники могли предоставлять выбираемый канал. Это может или не может иметь место.
Если выбираемые каналы недоступны, вы можете решить это с помощью одного потока , разрешив реализации чтения выполнить следующее: Для каждого входного потока is
, проверьте, если is.available() > 0
, и если да, верните is.read()
. Повторяйте эту процедуру, пока в некотором входном потоке не будут доступны данные.
Однако этот метод имеет два основных недостатка:
Не все реализации из InputStream
реализует available()
таким образом, что он возвращает 0, если и только если read()
заблокирует. В результате, естественно, данные не могут быть прочитаны из этого потока, даже если is.read()
вернет значение. Вопрос о том, следует ли это рассматривать как ошибку, сомнителен, поскольку в документации просто говорится, что она должна возвращать «оценку» количества доступных байтов.
Он использует так называемый "занятый цикл", который в основном означает, что вам нужно либо включить спящий цикл (что приводит к задержке чтения), либо излишне перегружать процессор.
Ваш третий вариант - иметь дело с блокирующими чтениями , порождающими один поток для каждого входного потока . Однако это потребует тщательной синхронизации и, возможно, некоторых накладных расходов, если у вас очень много входных потоков для чтения. Код ниже является первой попыткой решить эту проблему. Я ни в коем случае не уверен, что он достаточно синхронизирован или управляет потоками наилучшим образом.
import java.io.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class MergedInputStream extends InputStream {
AtomicInteger openStreamCount;
BlockingQueue<Integer> buf = new ArrayBlockingQueue<Integer>(1);
InputStream[] sources;
public MergedInputStream(InputStream... sources) {
this.sources = sources;
openStreamCount = new AtomicInteger(sources.length);
for (int i = 0; i < sources.length; i++)
new ReadThread(i).start();
}
public void close() throws IOException {
String ex = "";
for (InputStream is : sources) {
try {
is.close();
} catch (IOException e) {
ex += e.getMessage() + " ";
}
}
if (ex.length() > 0)
throw new IOException(ex.substring(0, ex.length() - 1));
}
public int read() throws IOException {
if (openStreamCount.get() == 0)
return -1;
try {
return buf.take();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private class ReadThread extends Thread {
private final int src;
public ReadThread(int src) {
this.src = src;
}
public void run() {
try {
int data;
while ((data = sources[src].read()) != -1)
buf.put(data);
} catch (IOException ioex) {
} catch (InterruptedException e) {
}
openStreamCount.decrementAndGet();
}
}
}