Сбрасываемый отсчет - PullRequest
16 голосов
/ 06 июля 2011

Мне нужно что-то, что прямо эквивалентно CountDownLatch, но может быть сброшено (оставаясь потокобезопасным!).Я не могу использовать классические конструкции синхронизации, поскольку они просто не работают в этой ситуации (сложные проблемы блокировки).На данный момент я создаю много CountDownLatch объектов, каждый из которых заменяет предыдущий.Я считаю, что это происходит в молодом поколении в GC (из-за огромного количества объектов).Вы можете увидеть код, который использует защелки ниже (это часть макета java.net для интерфейса сетевого симулятора ns-3).

Некоторые идеи могут заключаться в том, чтобы попробовать CyclicBarrier (JDK5 +) или Phaser (JDK7)

Я могу протестировать код и вернуться к любому, кто найдет решение этой проблемы, поскольку я единственный, кто может вставить его в работающую систему, чтобы посмотреть, что произойдет:)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

Приветствия,
Крис

Ответы [ 6 ]

22 голосов
/ 06 июля 2011

Я скопировал CountDownLatch и реализовал метод reset(), который сбрасывает внутренний класс Sync в его начальное состояние (начальный счет) :) Кажется, работает нормально. Больше не нужно создавать ненужные объекты \ o / Невозможно создать подкласс, потому что sync был закрытым. Boo.

<code>import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * 
* *

Другое типичное использование - разделить проблему на N частей, * опишите каждую часть с помощью Runnable, который выполняет эту часть и * считает на защелке и ставит все Runnables в очередь * Исполнитель. Когда все части завершены, координирующий поток * сможет пройти через ждать. (Когда темы должны повторяться * отсчитывать таким образом, вместо этого используйте {@link CyclicBarrier}.) * *

 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * 
* *

Эффекты согласованности памяти: действия в потоке перед вызовом * {@code countDown ()} * произойдет до * действия после успешного возвращения из соответствующего * {@code await ()} в другом потоке. * * @ с 1,5 * @author Даг Ли * / открытый класс ResettableCountDownLatch { / ** * Синхронизация управления для CountDownLatch. * Использует состояние AQS для представления количества. * / закрытый статический финальный класс Sync extends AbstractQueuedSynchronizer { приватный статический финал long serialVersionUID = 4982264981922014374L; public final int startCount; Sync (int count) { this.startCount = count; SetState (startCount); } int getCount () { return getState (); } public int tryAcquireShared (int acquies) { return getState () == 0? 1: -1; } public boolean tryReleaseShared (int Releases) { // Decrement count; сигнал при переходе на ноль за (;;) { int c = getState (); если (с == 0) вернуть ложь; int nextc = c-1; if (compareAndSetState (c, nextc)) return nextc == 0; } } public void reset () { SetState (startCount); } } приватная финальная синхронизация синхронизации; / ** * Создает {@code CountDownLatch}, инициализированный с заданным количеством. * * @param count, сколько раз {@link #countDown} должен быть вызван * до того, как потоки могут пройти через {@link #await} * @throws IllegalArgumentException, если {@code count} отрицателен * / public ResettableCountDownLatch (int count) { if (count <0) выдает новое IllegalArgumentException ("count <0"); this.sync = новая синхронизация (количество); } / ** * Заставляет текущий поток ждать, пока защелка не обратится к * ноль, если поток не является {@linkplain Thread # прерывание прервано}. * * <p>Если текущий счетчик равен нулю, этот метод сразу возвращается. * *

Если текущий счетчик больше нуля, то текущий * поток становится отключенным для целей планирования потока и лжи * бездействует, пока не произойдет одно из двух: *

  • Счет достигает нуля из-за вызовов * {@link #countDown} метод; или же *
  • Какой-то другой поток {@linkplain Thread # прерывания прерывания} * текущая тема. *
* *

Если текущий поток: *

  • имеет статус прерывания, установленный при входе в этот метод; или же *
  • во время ожидания {@linkplain Thread # interrupt interruptpted} *
* затем {@link InterruptedException} выбрасывается и текущий поток * Прерванный статус очищается. * * @throws InterruptedException, если текущий поток прерывается * во время ожидания * / public void await () выдает InterruptedException { sync.acquireSharedInterruptibly (1); } public void reset () { sync.reset (); } / ** * Заставляет текущий поток ждать, пока защелка не обратится к* ноль, если поток не является {@linkplain Thread # прерывание прервано}, * или указанное время ожидания истекло. * *

Если текущий счетчик равен нулю, этот метод сразу возвращается * со значением {@code true}. * *

Если текущий счетчик больше нуля, то текущий * поток становится отключенным для целей планирования потока и лжи * бездействует, пока не произойдет одно из трех *

  • Счет достигает нуля из-за вызовов * {@link #countDown} метод; или же *
  • Какой-то другой поток {@linkplain Thread # прерывания прерывания} * текущий поток; или же *
  • Указанное время ожидания истекло. *
* *

Если счетчик достигает нуля, метод возвращается с * значение {@code true}. * *

Если текущий поток: *

  • имеет статус прерывания, установленный при входе в этот метод; или же *
  • во время ожидания {@linkplain Thread # прерывание прервано}, *
* затем {@link InterruptedException} выбрасывается и текущий поток * Прерванный статус очищается. * *

Если указанное время ожидания истекло, тогда значение {@code false} * возвращается. Если время меньше или равно нулю, метод * не будет ждать вообще. * * @param timeout максимальное время ожидания * @param unit единица времени аргумента {@code timeout} * @return {@code true}, если счетчик достиг нуля и {@code false} * если время ожидания истекло до того, как счетчик достиг нуля * @throws InterruptedException, если текущий поток прерывается * во время ожидания * / public boolean await (длительный тайм-аут, единица времени) бросает InterruptedException { return sync.tryAcquireSharedNanos (1, unit.toNanos (timeout)); } / ** * Уменьшает количество защелок, освобождая все ожидающие потоки, если * количество достигает нуля. * *

Если текущий счетчик больше нуля, то он уменьшается. * Если новый счетчик равен нулю, то все ожидающие потоки повторно включаются для * планирование потоков. * *

Если текущий счетчик равен нулю, то ничего не происходит. * / public void countDown () { sync.releaseShared (1); } / ** * Возвращает текущий счет. * *

Этот метод обычно используется для отладки и тестирования. * * @ вернуть текущий счет * / public long getCount () { return sync.getCount (); } / ** * Возвращает строку, идентифицирующую эту защелку, а также ее состояние. * Состояние в скобках включает в себя строку {@code "Count ="} * с последующим текущим счетом. * * @ вернуть строку, идентифицирующую эту защелку, а также ее состояние * / public String toString () { return super.toString () + "[Count =" + sync.getCount () + "]"; } }

3 голосов
/ 27 октября 2016

Основываясь на ответе @Fidel -s, я сделал замену для ResettableCountDownLatch. Изменения, которые я сделал

  • mLatch - это private volatile
  • mInitialCount - это private final
  • тип возврата простого await() изменен на void.

В остальном оригинальный код тоже классный. Итак, это полный расширенный код:

public class ResettableCountDownLatch {

    private final int initialCount;
    private volatile CountDownLatch latch;

    public ResettableCountDownLatch(int  count) {
        initialCount = count;
        latch = new CountDownLatch(count);
    }

    public void reset() {
        latch = new CountDownLatch(initialCount);
    }

    public void countDown() {
        latch.countDown();
    }

    public void await() throws InterruptedException {
        latch.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

Обновление

На основе @ Systemplanet -s комментария, вот более безопасная версия reset():

    // An atomic reference is required because reset() is not that atomic anymore, not even with `volatile`.
    private final AtomicReference<CountDownLatch> latchHolder = new AtomicReference<>();

    public void reset() {
        // obtaining a local reference for modifying the required latch
        final CountDownLatch oldLatch = latchHolder.getAndSet(null);
        if (oldLatch != null) {
            // checking the count each time to prevent unnecessary countdowns due to parallel countdowns
            while (0L < oldLatch.getCount()) {
                oldLatch.countDown();
            }
        }
    }

По сути, это выбор между простотой и безопасностью. То есть если вы хотите передать ответственность клиенту за ваш код, то достаточно установить ссылку null в reset().

С другой стороны, если вы хотите облегчить пользователям этого кода, вам нужно использовать немного больше уловок.

2 голосов
/ 13 июня 2018

Я не уверен, что это ошибочно, но недавно у меня возникла та же проблема, и я решил ее, просто создав новый объект CountDownLatch каждый раз, когда мне хотелось сбросить. Примерно так:

официант:

bla();
latch.await();
//now the latch has counted down to 0
blabla();

CountDowner

foo();
latch.countDown();
//now the latch has counted down to 0
latch = new CountDownLatch(1);
Waiter.receiveReferenceToNewLatch(latch);
bar();

Очевидно, что это тяжелая абстракция, но до сих пор она работала для меня и не требует от вас возиться с какими-либо определениями классов.

1 голос
/ 28 апреля 2017

Phaser имеет больше опций, с помощью которых мы можем реализовать сбрасываемый обратный отсчет.

Пожалуйста, ознакомьтесь с основными понятиями ниже со следующих сайтов

https://examples.javacodegeeks.com/core-java/util/concurrent/phaser/java-util-concurrent-phaser-example/

http://netjs.blogspot.in/2016/01/phaser-in-java-concurrency.html

import java.util.concurrent.Phaser;
/**
 * Resettable countdownLatch using phaser
 */
public class PhaserExample {
    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(3); // you can use constructor hint or
                                        // register() or mixture of both
        // register self... so parties are incremented to 4 (3+1) now
        phaser.register();
        //register is one time call for all the phases.
        //means no need to register for every phase             


        int phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);

        // similar to await() in countDownLatch/CyclicBarrier
        // parties are decremented to 3 (4+1) now
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

        //second phase
        new PhaserExample().testPhaser(phaser, 2000);
        new PhaserExample().testPhaser(phaser, 4000);
        new PhaserExample().testPhaser(phaser, 6000);
        phaser.arriveAndAwaitAdvance(); 
        // once all the thread arrived at same level, barrier opens
        System.out.println("Barrier has broken.");
        phasecount = phaser.getPhase();
        System.out.println("Phasecount is " + phasecount);

    }

    private void testPhaser(final Phaser phaser, final int sleepTime) {
        // phaser.register(); //Already constructor hint is given so not
        // required
        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(sleepTime);
                    System.out.println(Thread.currentThread().getName() + " arrived");
                    // phaser.arrive(); //similar to CountDownLatch#countDown()
                    phaser.arriveAndAwaitAdvance();// thread will wait till Barrier opens
                    // arriveAndAwaitAdvance is similar to CyclicBarrier#await()
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " after passing barrier");
            }
        }.start();
    }
}
0 голосов
/ 16 августа 2018

Похоже, вы хотите включить асинхронный ввод-вывод в синхронный. Вся идея использования асинхронного ввода-вывода состоит в том, чтобы избежать потоков, но CountDownLatch требует использования потоков. Это очевидное противоречие в вашем вопросе. Итак, вы можете:

  • продолжайте использовать потоки и применяйте синхронный ввод-вывод вместо селекторов и суффиксов. Это будет намного проще и надежнее
  • продолжайте использовать асинхронный I / 0 и откажитесь от CountDownLatch. Тогда вам нужна асинхронная библиотека - посмотрите на RxJava, Akka или df4j.
  • продолжайте развивать свой проект для удовольствия. Затем вы можете попробовать использовать java.util.Semaphore вместо CountDownLatch или запрограммировать свой собственный класс синхронизации с помощью synchronized / wait / notify.
0 голосов
/ 12 августа 2015

Еще одна замена для замены

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ResettableCountDownLatch {
    int mInitialCount;
    CountDownLatch mLatch;

    public ResettableCountDownLatch(int  count) {
        mInitialCount = count;
        mLatch = new CountDownLatch(count);
    }

    public void reset() {
        mLatch = new CountDownLatch(mInitialCount);
    }

    public void countDown() {
        mLatch.countDown();
    }

    public boolean await() throws InterruptedException {
        boolean result = mLatch.await();
        return result;
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        boolean result = mLatch.await(timeout, unit);
        return result;
    }
}
...