Java: Как поток может ожидать несколько объектов? - PullRequest
19 голосов
/ 07 июня 2011

Поток может использовать Object.wait() для блокировки до тех пор, пока другой поток не вызовет notify() или notifyAll() для этого объекта.

Но что, если поток хочет подождать, пока один из нескольких объекты сигнализируются?Например, мой поток должен ждать, пока или а) не станут доступны байты для чтения из InputStream или б) элемент будет добавлен в ArrayList.

Как поток можетждать какого-либо из этих событий?

РЕДАКТИРОВАТЬ

Этот вопрос связан с ожиданием завершения нескольких потоков - в моем случае поток ожидает одного из множества объектовsingnaled.

Ответы [ 8 ]

22 голосов
/ 07 июня 2011

Вас ждет мир боли. Используйте абстракцию более высокого уровня, такую ​​как блокирующая очередь сообщений, из которой поток может использовать такие сообщения, как «доступно больше байтов» или «добавлен элемент».

6 голосов
/ 07 июня 2011

Они все могут использовать один и тот же мьютекс.Вы, потребитель, ожидаете этот мьютекс, и оба других уведомляют этот мьютекс, когда первый может продолжить.

5 голосов
/ 07 июня 2011

Поток не может ожидать более одного объекта одновременно.

Методы wait() и notify() зависят от объекта. Метод wait() приостанавливает текущий поток выполнения и указывает объекту отслеживать приостановленный поток. Метод notify() сообщает объекту, что нужно пробудить приостановленные потоки, которые он отслеживает в данный момент.

Полезная ссылка: Может ли поток вызывать wait () для двух блокировок одновременно в Java (6)?

3 голосов
/ 22 июля 2015

Немного поздно, но это очень интересный вопрос! Казалось бы, вы действительно можете ожидать нескольких условий с одинаковой производительностью и без дополнительных потоков; Это просто вопрос определения проблемы! Я нашел время, чтобы написать более подробное объяснение в коммитах кода ниже. По запросу извлеку абстракцию:

Таким образом, ожидание нескольких объектов - это то же самое, что ожидание в нескольких условиях. Но следующий шаг - объединить ваши подусловия в -net-условие в -single-условие. И когда какой-либо компонент условия заставит его стать истинным, вы переворачиваете логическое значение и уведомляете блокировку (как любое другое условие ожидания-уведомления).

Мой подход :

Для любого условия это может привести только к двум значениям (true и false). Как это значение производится, не имеет значения. В вашем случае ваше «функциональное состояние» - это когда одно из двух значений истинно: (value_a || value_b). Я называю это «функциональное состояние» «Nexus-Point». Если вы применяете точку зрения, что любое сложное условие, независимо от того, насколько оно сложное, всегда приводит к простому результату (истинному или ложному), то на самом деле вы спрашиваете: «Что заставит мое сетевое условие стать истинным?» (Предполагая, что логика «Дождаться истины»). Таким образом, когда поток приводит к тому, что компонент вашего условия становится истинным (в вашем случае для value_a или value_b устанавливается значение true), и вы знаете, что это приведет к выполнению желаемого условия -net-, вы можете упростить подход к классическому (в том, что он переворачивает один логический флаг и снимает блокировку). Используя эту концепцию, вы можете применить объектно-ординатный подход, чтобы помочь прояснить вашу общую логику:

import java.util.HashSet;
import java.util.Set;

/**
 * The concept is that all control flow operation converge
 * to a single value: true or false. In the case of N
 * components in which create the resulting value, the
 * theory is the same. So I believe this is a matter of
 * perspective and permitting 'simple complexity'. for example:
 *
 * given the statement:
 *      while(condition_a || condition_b || ...) { ... }
 *
 * you could think of it as:
 *      let C = the boolean -resulting- value of (condition_a || condition_b || ...),
 *      so C = (condition_a || condition_b || ...);
 *
 * Now if we were to we-write the statement, in lamest-terms:
 *      while(C) { ... }
 *
 * Now if you recognise this form, you'll notice its just the standard
 * syntax for any control-flow statement?
 *
 *      while(condition_is_not_met) {
 *          synchronized (lock_for_condition) {
 *              lock_for_condition.wait();
 *            }
 *      }
 *
 * So in theory, even if the said condition was evolved from some
 * complex form, it should be treated as nothing more then if it
 * was in the simplest form. So whenever a component of the condition,
 * in which cause the net-condition (resulting value of the complex
 * condition) to be met, you would simply flip the boolean and notify
 * a lock to un-park whoever is waiting on it. Just like any standard
 * fashion.
 *
 * So thinking ahead, if you were to think of your given condition as a
 * function whos result is true or false, and takes the parameters of the states
 * in which its comprised of (  f(...) = (state_a || state_b && state_c), for example )
 * then you would recognize "If I enter this state, in which this I know would
 * cause that condition/lock to become true, I should just flip the switch switch,
 * and notify".
 *
 * So in your example, your 'functional condition' is:
 *      while(!state_a && !state_b) {
 *          wait until state a or state b is false ....
 *      }
 *
 * So armed with this mindset, using a simple/assertive form,
 * you would recognize that the overall question:
 * -> What would cause my condition to be true? : if  state_a is true OR state_b is true
 * Ok... So, that means: When state_a or state_b turn true, my overall condition is met!
 * So... I can just simplify this thing:
 *
 *      boolean net_condition = ...
 *      final Object lock = new Lock();
 *
 *      void await() {
 *          synchronized(lock) {
 *              while(!net_condition) {
 *                  lock.wait();
 *              }
 *           }
 *       }
 *
 * Almighty, so whenever I turn state_a true, I should just flip and notify
 * the net_condition!
 *
 *
 *
 * Now for a more expanded form of the SAME THING, just more direct and clear:
 *
 * @author Jamie Meisch
 */
public class Main {


    /**
     *
     * The equivalent if one was to "Wait for one of many condition/lock to
     * be notify me when met" :
     *
     *      synchronized(lock_a,lock_b,lock_c) {
     *          while(!condition_a || !condition_b || !condition_c) {
     *              condition_a.wait();
     *              condition_b.wait();
     *              condition_c.wait();
     *          }
     *      }
     *
     */
    public static void main(String... args) {

        OrNexusLock lock = new OrNexusLock();
        // The workers register themselves as their own variable as part of the overall condition,
        // in which is defined by the OrNuxusLock custom-implement. Which will be true if any of
        // the given variables are true
        SpinningWarrior warrior_a = new SpinningWarrior(lock,1000,5);
        SpinningWarrior warrior_b = new SpinningWarrior(lock,1000,20);
        SpinningWarrior warrior_c = new SpinningWarrior(lock,1000,50);

        new Thread(warrior_a).start();
        new Thread(warrior_b).start();
        new Thread(warrior_c).start();

        // So... if any one of these guys reaches 1000, stop waiting:
        // ^ As defined by our implement within the OrNexusLock


        try {
            System.out.println("Waiting for one of these guys to be done, or two, or all! does not matter, whoever comes first");
            lock.await();
            System.out.println("WIN: " + warrior_a.value() + ":" + warrior_b.value() + ":" + warrior_c.value());
        } catch (InterruptedException ignored) {
        }

    }


    // For those not using Java 8 :)
    public interface Condition {
        boolean value();
    }

    /**
     * A variable in which the net locks 'condition function'
     * uses to determine its overall -net- state.
     */
    public static class Variable {

        private final Object lock;
        private final Condition con;

        private Variable(Object lock, Condition con) {
            this.lock = lock;
            this.con  = con;
        }

        public boolean value() {
            return con.value();
        }

        //When the value of the condition changes, this should be called
        public void valueChanged() {
            synchronized (lock) {
                lock.notifyAll();
            }
        }

    }



    /**
     *
     * The lock has a custom function in which it derives its resulting
     * -overall- state (met, or not met). The form of the function does
     * not matter, but it only has boolean variables to work from. The
     * conditions are in their abstract form (a boolean value, how ever
     * that sub-condition is met). It's important to retain the theory
     * that complex conditions yeild a simple result. So expressing a
     * complex statement such as ( field * 5 > 20 ) results in a simple
     * true or false value condition/variable is what this approach is
     * about. Also by centerializing the overal logic, its much more
     * clear then the raw -simplest- form (listed above), and just
     * as fast!
     */
    public static abstract class NexusLock {
        private final Object lock;

        public NexusLock() {
            lock = new Object();
        }

        //Any complex condition you can fathom!
        //Plus I prefer it be consolidated into a nexus point,
        // and not asserted by assertive wake-ups
        protected abstract boolean stateFunction();

        protected Variable newVariable(Condition condition) {
            return new Variable(lock, condition);
        }

        //Wait for the overall condition to be met
        public void await() throws InterruptedException {
            synchronized (lock) {
                while (!stateFunction()) {
                    lock.wait();
                }
            }
        }

    }

    // A implement in which any variable must be true
    public static class OrNexusLock extends NexusLock {


        private final Set<Variable> vars = new HashSet<>();

        public OrNexusLock() {
        }


        public Variable newVar(Condition con) {
            Variable var = newVariable(con);
            vars.add(var); //register it as a general component of or net condition       // We should notify the thread since our functional-condition has changed/evolved:
            synchronized (lock) { lock.notifyAll(); }
            return var;
        }

        @Override
        public boolean stateFunction() { //Our condition for this lock
            // if any variable is true: if(var_a || var_b || var_c || ...)

            for(Variable var : vars) {
                if(var.value() == true) return true;
            }
            return false;
        }

    }

    //increments a value with delay, the condition is met when the provided count is reached
    private static class SpinningWarrior implements Runnable, Condition {

        private final int count;
        private final long delay;
        private final Variable var;

        private int tick = 0;

        public SpinningWarrior(OrNexusLock lock, int count, long delay) {
            this.var   = lock.newVar(this);
            this.count = count; //What to count to?
            this.delay = delay;
        }

        @Override
        public void run() {
            while (state_value==false) { //We're still counting up!
                tick++;
                chkState();
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ignored) {
                    break;
                }
            }
        }

        /**
         * Though redundant value-change-notification are OK,
         * its best to prevent them. As such its made clear to
         * that we will ever change state once.
         */
        private boolean state_value = false;
        private void chkState() {
            if(state_value ==true) return;
            if(tick >= count) {
                state_value = true;
                var.valueChanged(); //Our value has changed
            }
        }

        @Override
        public boolean value() {
            return state_value; //We could compute our condition in here, but for example sake.
        }

    }


}
2 голосов
/ 07 августа 2015

Похоже, что в вашем случае вас ждут "уведомления" из двух разных источников.Возможно, вам не придется «ждать» (как в обычном java synchronized(object) object.wait()) для этих двух объектов как таковых, но пусть они оба будут общаться с очередью или нет (как упоминают другие ответы, некоторые блокирующие коллекции, такие как LinkedBlockingQueue).

Если вы действительно хотите «ждать» двух разных java-объектов, вы можете сделать это, применив некоторые принципы из этого ответа: https://stackoverflow.com/a/31885029/32453 (по сути, каждый новый потоксделайте ожидание каждого из ожидаемых объектов, попросите их уведомить основной поток, когда сам объект будет уведомлен), но управлять синхронизированными аспектами может быть нелегко.

2 голосов
/ 07 июня 2011

Вы можете ждать только на одном мониторе. Таким образом, уведомители должны уведомить этот один монитор. В этой низкоуровневой синхронизации нет другого пути.

2 голосов
/ 07 июня 2011

Блокировка в обоих случаях над одним и тем же объектом.Вызовите в случае а) или в случае б) notify () для того же объекта.

0 голосов
/ 05 февраля 2016

Для обработки завершения любого потока из данного набора без ожидания завершения всех из них , выделенный общий объект (lastExited ниже) может использоваться как монитор (wait() и notify() в synchronized блоках). Дополнительные мониторы необходимы для обеспечения того, чтобы в любое время выходил не более одного потока (notifyExitMutex) и не более одного потока ожидал выхода любого потока (waitAnyExitMonitor); таким образом, пары wait() / notify() всегда относятся к разным блокам.

Пример (все завершения процесса обрабатываются в порядке завершения потоков ):

import java.util.Random;

public class ThreadMonitor {

    private final Runnable[] lastExited = { null };

    private final Object notifyExitMutex = new Object();
    public void startThread(final Runnable runnable) {
        (new Thread(new Runnable() { public void run() {
            try { runnable.run(); } catch (Throwable t) { }
            synchronized (notifyExitMutex) {
                synchronized (lastExited) {
                    while (true) {
                        try {
                            if (lastExited[0] != null) lastExited.wait();
                            lastExited[0] = runnable;
                            lastExited.notify();
                            return;
                        }
                        catch (InterruptedException e) { }
                    }
                }
            }
        }})).start();
    }

    private final Object waitAnyExitMutex = new Object();
    public Runnable waitAnyExit() throws InterruptedException {
        synchronized (waitAnyExitMutex) {
            synchronized (lastExited) {
                if (lastExited[0] == null) lastExited.wait();
                Runnable runnable = lastExited[0];
                lastExited[0] = null;
                lastExited.notify();
                return runnable;
            }
        }
    }

    private static Random random = new Random();
    public static void main(String[] args) throws InterruptedException {
        ThreadMonitor threadMonitor = new ThreadMonitor();

        int threadCount = 0;
        while (threadCount != 100) {
            Runnable runnable = new Runnable() { public void run() {
                try { Thread.sleep(1000 + random.nextInt(100)); }
                catch (InterruptedException e) { }
            }};
            threadMonitor.startThread(runnable);
            System.err.println(runnable + " started");
            threadCount++;
        }

        while (threadCount != 0) {
            Runnable runnable = threadMonitor.waitAnyExit();
            System.err.println(runnable + " exited");
            threadCount--;
        }
    }
}
...