Это похоже на конструкцию производителя / потребителя. Похоже, что вы ошибаетесь, IO должен управлять алгоритмом. Поскольку вы остаетесь очень абстрактным в том, что на самом деле делает ваш код, мне нужно придерживаться этого.
Допустим, ваш «распределенный алгоритм» работает с данными типа T
; это означает, что он может быть описан как Consumer<T>
(имя метода в этом интерфейсе accept(T value)
). Поскольку он может работать одновременно, вы хотите создать несколько его экземпляров; обычно это делается с помощью ExecutorService
. Класс Executors
предоставляет хороший набор фабричных методов для его создания, давайте использовать Executors.newFixedThreadPool(parallelism)
.
Ваш поток "IO" выполняется для создания входных данных для алгоритма, то есть это Supplier<T>
. Мы можем запустить его в Executors.newSingleThreadExecutor()
.
Мы соединяем эти два, используя BlockingQueue<T>
; это коллекция FIFO. Поток ввода-вывода помещает элементы, а экземпляры алгоритма убирают следующий, который становится доступным.
Это заставляет всю установку выглядеть примерно так:
void run() {
int parallelism = 4; // or whatever
ExecutorService algorithmExecutor = Executors.newFixedThreadPool(parallelism);
ExecutorService ioExecutor = Executors.newSingleThreadExecutor();
// this queue will accept up to 4 elements
// this might need to be changed depending on performance of each
BlockingQueue<T> queue = new ArrayBlockingQueue<T>(parallelism);
ioExecutor.submit(new IoExecutor(queue));
// take element from queue
T nextElement = getNextElement(queue);
while (nextElement != null) {
algorithmExecutor.submit(() -> new AlgorithmInstance().accept(nextElement));
nextElement = getNextElement(queue);
if (nextElement == null) break;
}
// wait until algorithms have finished running and cleanup
algorithmExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.YEARS);
algorithmExecutor.shutdown();
ioExecutor.shutdown(); // the io thread should have terminated by now already
}
T getNextElement(BlockingQueue<T> queue) {
int timeOut = 1; // adjust depending on your IO
T result = null;
while (true) {
try {
result = queue.poll(timeOut, TimeUnits.SECONDS);
} catch (TimeoutException e) {} // retry indefinetely, we will get a value eventually
}
return result;
}
Теперь это не на самом деле ответ на ваш вопрос, потому что вы хотели знать, как поток ввода-вывода может быть уведомлен, когда он может продолжить чтение данных.
Это достигается ограничением BlockingQueue<>
, которое не будет принимать элементы после того, как это было достигнуто, то есть поток ввода-вывода может просто продолжать чтение и пытаться вставить элементы.
abstract class IoExecutor<T> {
private final BlockingQueue<T> queue;
public IoExecutor(BlockingQueue<T> q) { queue = q; }
public void run() {
while (hasMoreData()) {
T data = readData();
// this will block if the queue is full, so IO will pause
queue.put(data);
}
// put null into queue
queue.put(null);
}
protected boolean hasMoreData();
protected abstract T readData();
}
В результате во время выполнения у вас должно быть всегда запущено 4 потока алгоритма, а также (до) 4 элемента в очереди, ожидающих завершения одного из потоков алгоритма и их захвата.