Создание потока рассылки для безопасного семафора - PullRequest
3 голосов
/ 09 октября 2019

Я пытался создать двоичный семафор, который сможет безопасно блокировать выполнение метода, выполняющегося в потоке диспетчеризации событий (EDT), фактически не блокируя поток от обработки большего количества событий. Поначалу это может показаться невозможным, но Java имеет некоторые встроенные функции, связанные с этим, но я не могу заставить его работать.

Вариант использования

В настоящее время, если вы показываете модальныйсворачивая диалог от EDT, он будет блокировать EDT (потому что ваш метод, который отображал модальное диалоговое окно, не будет переходить на следующую строку, пока диалог не будет закрыт), но на самом деле есть некоторая скрытая магия, которая делает EDTвведите новый цикл обработки событий, который будет продолжать отправлять события до тех пор, пока не закроется модальное диалоговое окно.

В настоящее время у моей команды есть приложения, которые очень медленно мигрируют с Swing на JavaFX (довольно сложный переход), и я хотел иметь возможностьотображать модальные диалоговые окна JavaFX из потока диспетчеризации событий AWT таким же образом, как можно отображать изменяющиеся модальные диалоговые окна. Казалось, что какой-то семафор, безопасный для EDT, будет соответствовать этому варианту использования и, вероятно, пригодится для других целей в будущем.

Подход

java.awt.EventQueue.createSecondaryLoop() - это метод, который создаетSecondaryLoop объект, который затем можно использовать для запуска нового цикла обработки событий. Когда вы вызываете SecondaryLoop.enter(), вызов будет блокироваться, пока он обрабатывает новый цикл событий (обратите внимание, что call блокируется, но поток не блокируется, поскольку он продолжается в событиицикл обработки). Новый цикл обработки событий будет продолжаться до тех пор, пока вы не вызовете SecondaryLoop.exit() (это не совсем верно, см. Мой связанный вопрос SO ).

Итак, я создал семафор, в котором блокирующий вызов для получения результатов приводит к ожиданию защелки для нормального потока или вводу вторичного цикла для EDT. Каждый блокирующий вызов для получения также добавляет разблокирующую операцию, которая вызывается при освобождении семафора (для обычного потока это просто уменьшает защелку, для EDT он выходит из вторичного цикла).

Вот мойcode:


import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore{

    /** Operations used to unblock threads when a semaphore is released.
     * Must be a stack because secondary loops have to be exited in the
     * reverse of the order in which they were entered in order to unblock
     * the execution of the method that entered the loop.
     */
    private Stack<Runnable> releaseOperations = new Stack<>();

    private boolean semaphoreAlreadyAcquired = false;


    public EventDispatchThreadSafeBinarySemaphore() {
        super(0);
    }

    @Override
    public boolean isFair() {
        return false;
    }

    @Override
    public void acquire() throws InterruptedException {

        Runnable blockingOperation = () -> {};

        synchronized(this) {
            if(semaphoreAlreadyAcquired) {

                //We didn't acquire the semaphore, need to set up an operation to execute
                //while we're waiting on the semaphore and an operation for another thread
                //to execute in order to unblock us when the semaphore becomes available

                if(EventQueue.isDispatchThread()) {

                    //For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
                    //processing AWT events.
                    SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();

                    releaseOperations.add(() -> temporaryAwtLoop.exit());

                    blockingOperation = () -> {

                        if(!temporaryAwtLoop.enter()) {
                            //I don't think we'll run into this, but I'm leaving this here for now for debug purposes
                            System.err.println("Failed to enter event loop");
                        }
                    };
                }
                else {

                    //Non-dispatch thread is a little simpler, we'll just wait on a latch
                    CountDownLatch blockedLatch = new CountDownLatch(1);
                    releaseOperations.add(() -> blockedLatch.countDown());
                    blockingOperation = () -> {
                        try {
                            blockedLatch.await();
                        } catch (InterruptedException e) {
                            //I'll worry about handling this better once I have the basics figured out
                            e.printStackTrace();
                        }
                    };
                }
            }
            else {
                semaphoreAlreadyAcquired = true;
            }
        }

        //This part must be executed outside of the synchronized block so that we don't block
        //the EDT if it tries to acquire the semaphore while this statement is blocked
        blockingOperation.run();

    }

    @Override
    public void release() {
        synchronized(this) {
            if(releaseOperations.size() > 0) {
                //Release the last blocked thread
                releaseOperations.pop().run();
            }
            else {
                semaphoreAlreadyAcquired = false;
            }
        }
    }

}

А вот мой соответствующий тестовый код JUnit (извиняюсь за большой размер, это наименьший минимальный проверяемый пример, который я смог до сих пор придумать):

public class TestEventDispatchThreadSafeBinarySemaphore {

    private static EventDispatchThreadSafeBinarySemaphore semaphore;
        //See /12836963/secondaryloop-enter-ne-blokiruetsya-do-vyzova-exit-v-edt
        //for why we need this timer
        private static Timer timer = new Timer(500, null);
        @BeforeClass
    public static void setupClass() {
        timer.start();
    }

    @Before
    public void setup() {
        semaphore = new EventDispatchThreadSafeBinarySemaphore();
    }
        @AfterClass
    public static void cleanupClass() {
        timer.stop();
    }

        //This test passes just fine
        @Test(timeout = 1000)
    public void testBlockingAcquireReleaseOnEDT() throws InterruptedException {

        semaphore.acquire();

        CountDownLatch edtCodeStarted = new CountDownLatch(1);
        CountDownLatch edtCodeFinished = new CountDownLatch(1);

        SwingUtilities.invokeLater(() -> {
            //One countdown to indicate that this has begun running
            edtCodeStarted.countDown();
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            //This countdown indicates that it has finished running
            edtCodeFinished.countDown();

        });

        //Ensure that the code on the EDT has started
        edtCodeStarted.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        //Ensure that things can still run on the EDT
        CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
        SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());

        //If we get past this line, then we know that the EDT is live even though the 
        //code in the invokeLater call is blocked
        edtActiveCheckingLatch.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        semaphore.release();

        //If we get past this line, then the code on the EDT got past the semaphore
        edtCodeFinished.await();
    }

        //This test fails intermittently, but so far only after the previous test was run first
    @Test(timeout = 10000)
    public void testConcurrentAcquiresOnEDT() throws InterruptedException {

        int numThreads =100;

        CountDownLatch doneLatch = new CountDownLatch(numThreads);

        try {
            semaphore.acquire();

            //Queue up a bunch of threads to acquire and release the semaphore
            //as soon as it becomes available
            IntStream.range(0, numThreads)
                    .parallel()
                    .forEach((threadNumber) -> 
                        SwingUtilities.invokeLater(() -> {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            finally {
                                semaphore.release();
                                //Count down the latch to indicate that the thread terminated
                                doneLatch.countDown();
                            }
                        })
                    );

            semaphore.release();

            doneLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Проблема

testConcurrentAcquiresOnEDT иногда проходит, а иногда и терпит неудачу. Я верю, что знаю почему. Я копался в исходном коде Java и в WaitDispatchSupport (конкретная реализация SecondaryLoop) цикл в основном продолжает отправлять события до тех пор, пока флаг с именем keepBlockingEDT не будет сброшен. Он проверит это между событиями. Когда я вызываю exit, он сбросит этот флаг и отправит событие, чтобы активировать очередь событий, если она ожидала новых событий. Однако, это не приведет к немедленному завершению метода enter() (и я не думаю, что в любом случае это возможно).

Итак, вот как получается тупик:

  • Основной поток получает семафор
  • Поток EDT пытается получить семафор, но он уже получен, поэтому он:
    • Создает новый вторичный цикл
    • Создает Runnableкоторый выйдет из нового вторичного цикла и вытолкнет его в стек releaseOperations
    • Входит во вторичный цикл, вызывая блокировку выполнения (обратите внимание, что этот последний шаг по необходимости находится вне блока synchronized
  • Основной поток освобождает семафор, что приводит к следующему:
    • Стек releaseOperations извлекается, и он вызывает exit во вторичном цикле
    • При вызове exit устанавливается флаг keepBlockingEDT для этого вторичного цикла, который должен быть установлен в false
  • Вернувшись в EDT, он только что завершил проверку флага keepBlockingEDT(прямо перед тем, как было установлено значение false) ионо извлекает следующее событие.
  • Оказывается, следующее событие - это еще один исполняемый объект, блокирующий семафор, поэтому он пытается получить его
  • . Это создает еще один SecondaryLoop поверхоригинал SecondaryLoop и входит в него
  • К этому моменту у исходного SecondaryLoop уже был снят флаг keepBlockingEDT, и он сможет прекратить блокировку, за исключением того, что в данный момент он заблокирован, выполняя второй SecondaryLoop. У второго SecondaryLoop никогда не будет вызова exit, потому что на самом деле никто не получил семафор прямо сейчас, поэтому мы блокируем навсегда.

Я работал над этим несколько днейи каждая идея, которую я придумаю, является тупиком.

Я считаю, что у меня есть возможное частичное решение, которое состоит в том, чтобы просто не допустить, чтобы одновременно блокировалось более одного потока на семафоре (если другойпоток пытается получить его, я просто сгенерирую исключение IllegalStateException). Я все еще мог бы иметь несколько вторичных циклов, если бы каждый из них использовал свой собственный семафор, но каждый семафор создал бы максимум 1 вторичный цикл. Я думаю, что это сработает, и оно вполне удовлетворит мой наиболее вероятный вариант использования (потому что в основном я просто хочу показать один модальный диалог JavaFX из потока событий). Я просто хотел узнать, есть ли у кого-нибудь другие идеи, потому что я чувствую, что я близок к созданию чего-то очень крутого, но это не совсем работает.

Дайте мне знать, если у вас есть какие-либо идеи. И «Я почти уверен, что это невозможно, и вот почему ...» также является приемлемым ответом.

...