Как прервать поток, чтобы сделать работу, а затем спать после выполнения работы? - PullRequest
0 голосов
/ 02 июня 2019

Я хочу иметь поток, который выполняет некоторые операции ввода-вывода, когда он прерывается основным потоком, а затем возвращается в режим ожидания / ожидания, пока прерывание не будет вызвано снова.

Итак, у меня естьпридумать реализацию, которая, кажется, не работает.Ниже приведен фрагмент кода.

Примечание. Здесь flag - это общедоступная переменная, к которой можно обращаться через класс потока, который находится в основном классе

// in the main function this is how I am calling it
if(!flag) {
    thread.interrupt()
}

//this is how my thread class is implemented
class IOworkthread extends Thread {
@Override
public void run() {
    while(true) {
         try {
             flag = false;
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             flag = true;
             try {
                  // doing my I/O work
             } catch (Exception e1) {
                   // print the exception message
             }
         }
    }
}
}

В приведенном выше фрагментевторой блок try-catch перехватывает исключение InterruptedException.Это означает, что и первый, и второй блоки try-catch перехватывают прерывание.Но я вызвал прерывание только во время первого блока try-catch.

Не могли бы вы помочь мне с этим?

РЕДАКТИРОВАТЬ Если вы чувствуете, что может бытьдругое решение для моей цели, я буду рад узнать об этом :)

Ответы [ 2 ]

0 голосов
/ 04 июня 2019

Это похоже на конструкцию производителя / потребителя. Похоже, что вы ошибаетесь, IO должен управлять алгоритмом. Поскольку вы остаетесь очень абстрактным в том, что на самом деле делает ваш код, мне нужно придерживаться этого.

Допустим, ваш «распределенный алгоритм» работает с данными типа T; это означает, что он может быть описан как Consumer<T> (имя метода в этом интерфейсе accept(T value)). Поскольку он может работать одновременно, вы хотите создать несколько его экземпляров; обычно это делается с помощью ExecutorService. Класс Executors предоставляет хороший набор фабричных методов для его создания, давайте использовать Executors.newFixedThreadPool(parallelism).

Ваш поток "IO" выполняется для создания входных данных для алгоритма, то есть это Supplier<T>. Мы можем запустить его в Executors.newSingleThreadExecutor().

Мы соединяем эти два, используя BlockingQueue<T>; это коллекция FIFO. Поток ввода-вывода помещает элементы, а экземпляры алгоритма убирают следующий, который становится доступным.

Это заставляет всю установку выглядеть примерно так:

void run() {
    int parallelism = 4; // or whatever
    ExecutorService algorithmExecutor = Executors.newFixedThreadPool(parallelism);
    ExecutorService ioExecutor = Executors.newSingleThreadExecutor();

    // this queue will accept up to 4 elements
    // this might need to be changed depending on performance of each
    BlockingQueue<T> queue = new ArrayBlockingQueue<T>(parallelism);
    ioExecutor.submit(new IoExecutor(queue));

    // take element from queue
    T nextElement = getNextElement(queue);
    while (nextElement != null) {
        algorithmExecutor.submit(() -> new AlgorithmInstance().accept(nextElement));
        nextElement = getNextElement(queue);
        if (nextElement == null) break;
    }
    // wait until algorithms have finished running and cleanup
    algorithmExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.YEARS);
    algorithmExecutor.shutdown();
    ioExecutor.shutdown(); // the io thread should have terminated by now already
}

T getNextElement(BlockingQueue<T> queue) {
    int timeOut = 1; // adjust depending on your IO
    T result = null;
    while (true) {
        try {
            result = queue.poll(timeOut, TimeUnits.SECONDS);
        } catch (TimeoutException e) {} // retry indefinetely, we will get a value eventually
    }
    return result;
}

Теперь это не на самом деле ответ на ваш вопрос, потому что вы хотели знать, как поток ввода-вывода может быть уведомлен, когда он может продолжить чтение данных.

Это достигается ограничением BlockingQueue<>, которое не будет принимать элементы после того, как это было достигнуто, то есть поток ввода-вывода может просто продолжать чтение и пытаться вставить элементы.

abstract class IoExecutor<T> {
    private final BlockingQueue<T> queue;
    public IoExecutor(BlockingQueue<T> q) { queue = q; }
    public void run() {
        while (hasMoreData()) {
            T data = readData();
            // this will block if the queue is full, so IO will pause
            queue.put(data);
        }
        // put null into queue
        queue.put(null);
    }
    protected boolean hasMoreData();
    protected abstract T readData();
}

В результате во время выполнения у вас должно быть всегда запущено 4 потока алгоритма, а также (до) 4 элемента в очереди, ожидающих завершения одного из потоков алгоритма и их захвата.

0 голосов
/ 02 июня 2019

Если важно быстро отреагировать на флаг, вы можете попробовать следующее:

class IOworkthread extends Thread {//implements Runnable would be better here, but thats another story
    @Override
    public void run() {
        while(true) {
            try {
                flag = false;
                Thread.sleep(1000);
            }
            catch (InterruptedException e) {
                flag = true;
            }
            //after the catch block the interrupted state of the thread should be reset and there should be no exceptions here
            try {
                // doing I/O work
            }
            catch (Exception e1) {
                // print the exception message
                // here of course other exceptions could appear but if there is no Thread.sleep() used here there should be no InterruptedException in this block
            }
        }
    }
}

Это должно измениться, потому что в блоке catch, когда перехватывается InterruptedException, флаг прерывания потока сбрасывается(в конце блока захвата).

...